Skip to main content

meerkat_runtime/
runtime_event.rs

1//! §7 RuntimeEvent — full event hierarchy for runtime observability.
2//!
3//! Five categories: InputLifecycle, RunLifecycle, RuntimeState, Topology, Projection.
4
5use chrono::{DateTime, Utc};
6use meerkat_core::TurnErrorMetadata;
7use meerkat_core::lifecycle::{InputId, RunId};
8use serde::{Deserialize, Serialize};
9
10use crate::identifiers::{
11    CausationId, CorrelationId, EventCodeId, LogicalRuntimeId, RuntimeEventId,
12};
13use crate::input_state::{InputAbandonReason, InputLifecycleState};
14use crate::runtime_state::RuntimeState;
15
16/// Envelope for all runtime events.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct RuntimeEventEnvelope {
19    /// Unique event ID.
20    pub id: RuntimeEventId,
21    /// When the event occurred.
22    pub timestamp: DateTime<Utc>,
23    /// Which runtime emitted this event.
24    pub runtime_id: LogicalRuntimeId,
25    /// The event payload.
26    pub event: RuntimeEvent,
27    /// Causation chain.
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub causation_id: Option<CausationId>,
30    /// Correlation for cross-boundary tracing.
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub correlation_id: Option<CorrelationId>,
33}
34
35/// Runtime event categories and codes.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(tag = "category", content = "data", rename_all = "snake_case")]
38#[non_exhaustive]
39pub enum RuntimeEvent {
40    /// Input lifecycle state transitions.
41    InputLifecycle(InputLifecycleEvent),
42    /// Run lifecycle state transitions.
43    RunLifecycle(RunLifecycleEvent),
44    /// Runtime state transitions.
45    RuntimeStateChange(RuntimeStateChangeEvent),
46    /// Topology changes (agents joining/leaving).
47    Topology(RuntimeTopologyEvent),
48    /// Projection-derived events.
49    Projection(RuntimeProjectionEvent),
50}
51
52impl RuntimeEvent {
53    /// Get the stable event code for wire formats.
54    pub fn event_code(&self) -> EventCodeId {
55        match self {
56            RuntimeEvent::InputLifecycle(e) => e.event_code(),
57            RuntimeEvent::RunLifecycle(e) => e.event_code(),
58            RuntimeEvent::RuntimeStateChange(_) => EventCodeId::new("runtime.state_changed"),
59            RuntimeEvent::Topology(e) => e.event_code(),
60            RuntimeEvent::Projection(_) => EventCodeId::new("runtime.projection_emitted"),
61        }
62    }
63}
64
65/// Input lifecycle events.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(tag = "code", rename_all = "snake_case")]
68#[non_exhaustive]
69pub enum InputLifecycleEvent {
70    /// Input accepted into the system.
71    Accepted { input_id: InputId },
72    /// Input deduplicated (idempotency key matched).
73    Deduplicated {
74        input_id: InputId,
75        existing_id: InputId,
76    },
77    /// Input superseded by a newer input.
78    Superseded {
79        input_id: InputId,
80        superseded_by: InputId,
81    },
82    /// Input coalesced into an aggregate.
83    Coalesced {
84        input_id: InputId,
85        aggregate_id: InputId,
86    },
87    /// Input queued for processing.
88    Queued { input_id: InputId },
89    /// Input staged at a run boundary.
90    Staged { input_id: InputId, run_id: RunId },
91    /// Input applied (boundary primitive executed).
92    Applied { input_id: InputId, run_id: RunId },
93    /// Input consumed (run completed successfully).
94    Consumed { input_id: InputId, run_id: RunId },
95    /// Input abandoned (retire/reset/destroy).
96    ///
97    /// Carries the typed abandon cause (`InputAbandonReason`) sourced from the
98    /// terminal outcome, not a stringified label.
99    Abandoned {
100        input_id: InputId,
101        reason: InputAbandonReason,
102    },
103    /// Input state transitioned.
104    StateTransitioned {
105        input_id: InputId,
106        from: InputLifecycleState,
107        to: InputLifecycleState,
108    },
109}
110
111impl InputLifecycleEvent {
112    pub fn event_code(&self) -> EventCodeId {
113        match self {
114            Self::Accepted { .. } => EventCodeId::new("input.accepted"),
115            Self::Deduplicated { .. } => EventCodeId::new("input.deduplicated"),
116            Self::Superseded { .. } => EventCodeId::new("input.superseded"),
117            Self::Coalesced { .. } => EventCodeId::new("input.coalesced"),
118            Self::Queued { .. } => EventCodeId::new("input.queued"),
119            Self::Staged { .. } => EventCodeId::new("input.staged"),
120            Self::Applied { .. } => EventCodeId::new("input.applied"),
121            Self::Consumed { .. } => EventCodeId::new("input.consumed"),
122            Self::Abandoned { .. } => EventCodeId::new("input.abandoned"),
123            Self::StateTransitioned { .. } => EventCodeId::new("input.state_transitioned"),
124        }
125    }
126}
127
128/// Run lifecycle events.
129#[derive(Debug, Clone, Serialize, Deserialize)]
130#[serde(tag = "code", rename_all = "snake_case")]
131#[non_exhaustive]
132pub enum RunLifecycleEvent {
133    /// A run started.
134    Started { run_id: RunId },
135    /// A run completed successfully.
136    Completed { run_id: RunId },
137    /// A run failed.
138    ///
139    /// Carries the typed turn-failure cause (`TurnErrorMetadata`) instead of a
140    /// stringified error. Recoverability is conveyed by
141    /// `TurnErrorMetadata::retryable`; human-readable diagnostics remain in
142    /// `TurnErrorMetadata::detail`.
143    Failed {
144        run_id: RunId,
145        error: TurnErrorMetadata,
146    },
147    /// A run was cancelled.
148    Cancelled { run_id: RunId },
149}
150
151impl RunLifecycleEvent {
152    pub fn event_code(&self) -> EventCodeId {
153        match self {
154            Self::Started { .. } => EventCodeId::new("run.started"),
155            Self::Completed { .. } => EventCodeId::new("run.completed"),
156            Self::Failed { .. } => EventCodeId::new("run.failed"),
157            Self::Cancelled { .. } => EventCodeId::new("run.cancelled"),
158        }
159    }
160}
161
162/// Runtime state change events.
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct RuntimeStateChangeEvent {
165    pub from: RuntimeState,
166    pub to: RuntimeState,
167}
168
169/// Topology events (agents joining/leaving the runtime).
170#[derive(Debug, Clone, Serialize, Deserialize)]
171#[serde(tag = "code", rename_all = "snake_case")]
172#[non_exhaustive]
173pub enum RuntimeTopologyEvent {
174    /// A runtime instance was created.
175    RuntimeCreated { runtime_id: LogicalRuntimeId },
176    /// A runtime instance was retired.
177    RuntimeRetired { runtime_id: LogicalRuntimeId },
178    /// A runtime instance was recycled (driver reset and state recovered).
179    RuntimeRecycled { runtime_id: LogicalRuntimeId },
180    /// A runtime instance was destroyed.
181    RuntimeDestroyed { runtime_id: LogicalRuntimeId },
182}
183
184impl RuntimeTopologyEvent {
185    pub fn event_code(&self) -> EventCodeId {
186        match self {
187            Self::RuntimeCreated { .. } => EventCodeId::new("topology.runtime_created"),
188            Self::RuntimeRetired { .. } => EventCodeId::new("topology.runtime_retired"),
189            Self::RuntimeRecycled { .. } => EventCodeId::new("topology.runtime_recycled"),
190            Self::RuntimeDestroyed { .. } => EventCodeId::new("topology.runtime_destroyed"),
191        }
192    }
193}
194
195/// Projection-derived events.
196#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct RuntimeProjectionEvent {
198    /// The projection rule that generated this event.
199    pub rule_id: String,
200    /// The projected input ID.
201    pub projected_input_id: InputId,
202    /// Source event that triggered the projection.
203    pub source_event_id: RuntimeEventId,
204}
205
206#[cfg(test)]
207#[allow(clippy::unwrap_used)]
208mod tests {
209    use super::*;
210
211    fn make_envelope(event: RuntimeEvent) -> RuntimeEventEnvelope {
212        RuntimeEventEnvelope {
213            id: RuntimeEventId::new(),
214            timestamp: Utc::now(),
215            runtime_id: LogicalRuntimeId::new("test-runtime"),
216            event,
217            causation_id: None,
218            correlation_id: None,
219        }
220    }
221
222    #[test]
223    fn input_lifecycle_accepted_serde() {
224        let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
225            input_id: InputId::new(),
226        });
227        let envelope = make_envelope(event);
228        let json = serde_json::to_value(&envelope).unwrap();
229        assert_eq!(json["event"]["category"], "input_lifecycle");
230        assert_eq!(json["event"]["data"]["code"], "accepted");
231        let parsed: RuntimeEventEnvelope = serde_json::from_value(json).unwrap();
232        assert!(matches!(
233            parsed.event,
234            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted { .. })
235        ));
236    }
237
238    #[test]
239    fn input_lifecycle_deduplicated_serde() {
240        let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated {
241            input_id: InputId::new(),
242            existing_id: InputId::new(),
243        });
244        let json = serde_json::to_value(&event).unwrap();
245        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
246        assert!(matches!(
247            parsed,
248            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated { .. })
249        ));
250    }
251
252    #[test]
253    fn input_lifecycle_superseded_serde() {
254        let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded {
255            input_id: InputId::new(),
256            superseded_by: InputId::new(),
257        });
258        let json = serde_json::to_value(&event).unwrap();
259        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
260        assert!(matches!(
261            parsed,
262            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded { .. })
263        ));
264    }
265
266    #[test]
267    fn input_lifecycle_coalesced_serde() {
268        let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced {
269            input_id: InputId::new(),
270            aggregate_id: InputId::new(),
271        });
272        let json = serde_json::to_value(&event).unwrap();
273        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
274        assert!(matches!(
275            parsed,
276            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced { .. })
277        ));
278    }
279
280    #[test]
281    fn run_lifecycle_started_serde() {
282        let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started {
283            run_id: RunId::new(),
284        });
285        let json = serde_json::to_value(&event).unwrap();
286        assert_eq!(json["category"], "run_lifecycle");
287        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
288        assert!(matches!(
289            parsed,
290            RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started { .. })
291        ));
292    }
293
294    #[test]
295    fn run_lifecycle_failed_serde() {
296        let mut metadata = TurnErrorMetadata::runtime_apply_failure("timeout");
297        metadata.retryable = Some(true);
298        let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed {
299            run_id: RunId::new(),
300            error: metadata,
301        });
302        let json = serde_json::to_value(&event).unwrap();
303        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
304        let RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed { error, .. }) = parsed else {
305            panic!("expected run lifecycle failed event");
306        };
307        assert_eq!(error.retryable, Some(true));
308        assert_eq!(
309            error.kind,
310            meerkat_core::TurnTerminalCauseKind::RuntimeApplyFailure
311        );
312    }
313
314    #[test]
315    fn input_lifecycle_abandoned_carries_typed_reason() {
316        let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Abandoned {
317            input_id: InputId::new(),
318            reason: InputAbandonReason::MaxAttemptsExhausted { attempts: 3 },
319        });
320        let json = serde_json::to_value(&event).unwrap();
321        // RuntimeEvent is adjacently tagged (category/data), so the inner
322        // InputLifecycleEvent `code` discriminator lives under `data`.
323        assert_eq!(json["category"], "input_lifecycle");
324        assert_eq!(json["data"]["code"], "abandoned");
325        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
326        let RuntimeEvent::InputLifecycle(InputLifecycleEvent::Abandoned { reason, .. }) = parsed
327        else {
328            panic!("expected input lifecycle abandoned event");
329        };
330        assert_eq!(
331            reason,
332            InputAbandonReason::MaxAttemptsExhausted { attempts: 3 }
333        );
334    }
335
336    #[test]
337    fn runtime_state_change_serde() {
338        let event = RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
339            from: RuntimeState::Idle,
340            to: RuntimeState::Running,
341        });
342        let json = serde_json::to_value(&event).unwrap();
343        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
344        assert!(matches!(
345            parsed,
346            RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
347                from: RuntimeState::Idle,
348                to: RuntimeState::Running,
349            })
350        ));
351    }
352
353    #[test]
354    fn topology_created_serde() {
355        let event = RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated {
356            runtime_id: LogicalRuntimeId::new("mob-agent-1"),
357        });
358        let json = serde_json::to_value(&event).unwrap();
359        assert_eq!(json["category"], "topology");
360        let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
361        assert!(matches!(
362            parsed,
363            RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated { .. })
364        ));
365    }
366
367    #[test]
368    fn event_code_coverage() {
369        let events = vec![
370            RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
371                input_id: InputId::new(),
372            }),
373            RuntimeEvent::RunLifecycle(RunLifecycleEvent::Completed {
374                run_id: RunId::new(),
375            }),
376            RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
377                from: RuntimeState::Idle,
378                to: RuntimeState::Running,
379            }),
380            RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeRetired {
381                runtime_id: LogicalRuntimeId::new("x"),
382            }),
383            RuntimeEvent::Projection(RuntimeProjectionEvent {
384                rule_id: "rule-1".into(),
385                projected_input_id: InputId::new(),
386                source_event_id: RuntimeEventId::new(),
387            }),
388        ];
389        for event in &events {
390            let code = event.event_code();
391            assert!(!code.0.is_empty());
392        }
393    }
394}