use chrono::{DateTime, Utc};
use meerkat_core::lifecycle::{InputId, RunId};
use serde::{Deserialize, Serialize};
use crate::identifiers::{
CausationId, CorrelationId, EventCodeId, LogicalRuntimeId, RuntimeEventId,
};
use crate::input_state::InputLifecycleState;
use crate::runtime_state::RuntimeState;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeEventEnvelope {
pub id: RuntimeEventId,
pub timestamp: DateTime<Utc>,
pub runtime_id: LogicalRuntimeId,
pub event: RuntimeEvent,
#[serde(skip_serializing_if = "Option::is_none")]
pub causation_id: Option<CausationId>,
#[serde(skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<CorrelationId>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "category", content = "data", rename_all = "snake_case")]
#[non_exhaustive]
pub enum RuntimeEvent {
InputLifecycle(InputLifecycleEvent),
RunLifecycle(RunLifecycleEvent),
RuntimeStateChange(RuntimeStateChangeEvent),
Topology(RuntimeTopologyEvent),
Projection(RuntimeProjectionEvent),
}
impl RuntimeEvent {
pub fn event_code(&self) -> EventCodeId {
match self {
RuntimeEvent::InputLifecycle(e) => e.event_code(),
RuntimeEvent::RunLifecycle(e) => e.event_code(),
RuntimeEvent::RuntimeStateChange(_) => EventCodeId::new("runtime.state_changed"),
RuntimeEvent::Topology(e) => e.event_code(),
RuntimeEvent::Projection(_) => EventCodeId::new("runtime.projection_emitted"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "code", rename_all = "snake_case")]
#[non_exhaustive]
pub enum InputLifecycleEvent {
Accepted { input_id: InputId },
Deduplicated {
input_id: InputId,
existing_id: InputId,
},
Superseded {
input_id: InputId,
superseded_by: InputId,
},
Coalesced {
input_id: InputId,
aggregate_id: InputId,
},
Queued { input_id: InputId },
Staged { input_id: InputId, run_id: RunId },
Applied { input_id: InputId, run_id: RunId },
Consumed { input_id: InputId, run_id: RunId },
Abandoned { input_id: InputId, reason: String },
StateTransitioned {
input_id: InputId,
from: InputLifecycleState,
to: InputLifecycleState,
},
}
impl InputLifecycleEvent {
pub fn event_code(&self) -> EventCodeId {
match self {
Self::Accepted { .. } => EventCodeId::new("input.accepted"),
Self::Deduplicated { .. } => EventCodeId::new("input.deduplicated"),
Self::Superseded { .. } => EventCodeId::new("input.superseded"),
Self::Coalesced { .. } => EventCodeId::new("input.coalesced"),
Self::Queued { .. } => EventCodeId::new("input.queued"),
Self::Staged { .. } => EventCodeId::new("input.staged"),
Self::Applied { .. } => EventCodeId::new("input.applied"),
Self::Consumed { .. } => EventCodeId::new("input.consumed"),
Self::Abandoned { .. } => EventCodeId::new("input.abandoned"),
Self::StateTransitioned { .. } => EventCodeId::new("input.state_transitioned"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "code", rename_all = "snake_case")]
#[non_exhaustive]
pub enum RunLifecycleEvent {
Started { run_id: RunId },
Completed { run_id: RunId },
Failed {
run_id: RunId,
error: String,
recoverable: bool,
},
Cancelled { run_id: RunId },
}
impl RunLifecycleEvent {
pub fn event_code(&self) -> EventCodeId {
match self {
Self::Started { .. } => EventCodeId::new("run.started"),
Self::Completed { .. } => EventCodeId::new("run.completed"),
Self::Failed { .. } => EventCodeId::new("run.failed"),
Self::Cancelled { .. } => EventCodeId::new("run.cancelled"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeStateChangeEvent {
pub from: RuntimeState,
pub to: RuntimeState,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "code", rename_all = "snake_case")]
#[non_exhaustive]
pub enum RuntimeTopologyEvent {
RuntimeCreated { runtime_id: LogicalRuntimeId },
RuntimeRetired { runtime_id: LogicalRuntimeId },
RuntimeRecycled { runtime_id: LogicalRuntimeId },
RuntimeDestroyed { runtime_id: LogicalRuntimeId },
}
impl RuntimeTopologyEvent {
pub fn event_code(&self) -> EventCodeId {
match self {
Self::RuntimeCreated { .. } => EventCodeId::new("topology.runtime_created"),
Self::RuntimeRetired { .. } => EventCodeId::new("topology.runtime_retired"),
Self::RuntimeRecycled { .. } => EventCodeId::new("topology.runtime_recycled"),
Self::RuntimeDestroyed { .. } => EventCodeId::new("topology.runtime_destroyed"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeProjectionEvent {
pub rule_id: String,
pub projected_input_id: InputId,
pub source_event_id: RuntimeEventId,
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
fn make_envelope(event: RuntimeEvent) -> RuntimeEventEnvelope {
RuntimeEventEnvelope {
id: RuntimeEventId::new(),
timestamp: Utc::now(),
runtime_id: LogicalRuntimeId::new("test-runtime"),
event,
causation_id: None,
correlation_id: None,
}
}
#[test]
fn input_lifecycle_accepted_serde() {
let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
input_id: InputId::new(),
});
let envelope = make_envelope(event);
let json = serde_json::to_value(&envelope).unwrap();
assert_eq!(json["event"]["category"], "input_lifecycle");
assert_eq!(json["event"]["data"]["code"], "accepted");
let parsed: RuntimeEventEnvelope = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed.event,
RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted { .. })
));
}
#[test]
fn input_lifecycle_deduplicated_serde() {
let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated {
input_id: InputId::new(),
existing_id: InputId::new(),
});
let json = serde_json::to_value(&event).unwrap();
let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed,
RuntimeEvent::InputLifecycle(InputLifecycleEvent::Deduplicated { .. })
));
}
#[test]
fn input_lifecycle_superseded_serde() {
let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded {
input_id: InputId::new(),
superseded_by: InputId::new(),
});
let json = serde_json::to_value(&event).unwrap();
let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed,
RuntimeEvent::InputLifecycle(InputLifecycleEvent::Superseded { .. })
));
}
#[test]
fn input_lifecycle_coalesced_serde() {
let event = RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced {
input_id: InputId::new(),
aggregate_id: InputId::new(),
});
let json = serde_json::to_value(&event).unwrap();
let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed,
RuntimeEvent::InputLifecycle(InputLifecycleEvent::Coalesced { .. })
));
}
#[test]
fn run_lifecycle_started_serde() {
let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started {
run_id: RunId::new(),
});
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["category"], "run_lifecycle");
let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed,
RuntimeEvent::RunLifecycle(RunLifecycleEvent::Started { .. })
));
}
#[test]
fn run_lifecycle_failed_serde() {
let event = RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed {
run_id: RunId::new(),
error: "timeout".into(),
recoverable: true,
});
let json = serde_json::to_value(&event).unwrap();
let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed,
RuntimeEvent::RunLifecycle(RunLifecycleEvent::Failed {
recoverable: true,
..
})
));
}
#[test]
fn runtime_state_change_serde() {
let event = RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: RuntimeState::Idle,
to: RuntimeState::Running,
});
let json = serde_json::to_value(&event).unwrap();
let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed,
RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: RuntimeState::Idle,
to: RuntimeState::Running,
})
));
}
#[test]
fn topology_created_serde() {
let event = RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated {
runtime_id: LogicalRuntimeId::new("mob-agent-1"),
});
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["category"], "topology");
let parsed: RuntimeEvent = serde_json::from_value(json).unwrap();
assert!(matches!(
parsed,
RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeCreated { .. })
));
}
#[test]
fn event_code_coverage() {
let events = vec![
RuntimeEvent::InputLifecycle(InputLifecycleEvent::Accepted {
input_id: InputId::new(),
}),
RuntimeEvent::RunLifecycle(RunLifecycleEvent::Completed {
run_id: RunId::new(),
}),
RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: RuntimeState::Idle,
to: RuntimeState::Running,
}),
RuntimeEvent::Topology(RuntimeTopologyEvent::RuntimeRetired {
runtime_id: LogicalRuntimeId::new("x"),
}),
RuntimeEvent::Projection(RuntimeProjectionEvent {
rule_id: "rule-1".into(),
projected_input_id: InputId::new(),
source_event_id: RuntimeEventId::new(),
}),
];
for event in &events {
let code = event.event_code();
assert!(!code.0.is_empty());
}
}
}