use std::{collections::HashMap, convert::TryFrom};
use cloudevents::{AttributesReader, Data, Event as CloudEvent, EventBuilder, EventBuilderV10};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::model::Manifest;
use super::data::*;
pub const WADM_SOURCE: &str = "wadm";
macro_rules! event_impl {
($t:ident, $type_name:expr) => {
impl EventType for $t {
const TYPE: &'static str = $type_name;
}
impl From<$t> for Event {
fn from(value: $t) -> Event {
Event::$t(value)
}
}
impl std::convert::TryFrom<cloudevents::Event> for $t {
type Error = ConversionError;
fn try_from(mut value: cloudevents::Event) -> Result<Self, Self::Error> {
if $t::TYPE != value.ty() {
return Err(ConversionError::WrongEvent(value));
}
let (_, _, data) = value.take_data();
let data = data.ok_or(ConversionError::NoData)?;
match data {
Data::Binary(raw) => serde_json::from_reader(std::io::Cursor::new(raw))
.map_err(ConversionError::from),
Data::Json(v) => serde_json::from_value(v).map_err(ConversionError::from),
Data::String(_) => Err(ConversionError::NoData),
}
}
}
};
($t:ident, $type_name:expr, $event_attr:ident, $data_attr:ident) => {
impl EventType for $t {
const TYPE: &'static str = $type_name;
}
impl std::convert::TryFrom<cloudevents::Event> for $t {
type Error = ConversionError;
fn try_from(mut value: cloudevents::Event) -> Result<Self, Self::Error> {
if $t::TYPE != value.ty() {
return Err(ConversionError::WrongEvent(value));
}
let (_, _, data) = value.take_data();
let data = data.ok_or(ConversionError::NoData)?;
let mut parsed: Self = match data {
Data::Binary(raw) => serde_json::from_reader(std::io::Cursor::new(raw))
.map_err(ConversionError::from),
Data::Json(v) => serde_json::from_value(v).map_err(ConversionError::from),
Data::String(_) => Err(ConversionError::NoData),
}?;
parsed.$data_attr = value.$event_attr().to_string();
Ok(parsed)
}
}
};
}
pub trait EventType {
const TYPE: &'static str;
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub enum Event {
ActorStarted(ActorStarted),
ActorsStarted(ActorsStarted),
ActorsStartFailed(ActorsStartFailed),
ActorStopped(ActorStopped),
ActorsStopped(ActorsStopped),
ProviderStarted(ProviderStarted),
ProviderStopped(ProviderStopped),
ProviderStartFailed(ProviderStartFailed),
ProviderHealthCheckPassed(ProviderHealthCheckPassed),
ProviderHealthCheckFailed(ProviderHealthCheckFailed),
ProviderHealthCheckStatus(ProviderHealthCheckStatus),
HostStarted(HostStarted),
HostStopped(HostStopped),
HostHeartbeat(HostHeartbeat),
LinkdefSet(LinkdefSet),
LinkdefDeleted(LinkdefDeleted),
ManifestPublished(ManifestPublished),
ManifestUnpublished(ManifestUnpublished),
}
impl TryFrom<CloudEvent> for Event {
type Error = ConversionError;
fn try_from(value: CloudEvent) -> Result<Self, Self::Error> {
match value.ty() {
ActorStarted::TYPE => ActorStarted::try_from(value).map(Event::ActorStarted),
ActorsStarted::TYPE => ActorsStarted::try_from(value).map(Event::ActorsStarted),
ActorsStartFailed::TYPE => {
ActorsStartFailed::try_from(value).map(Event::ActorsStartFailed)
}
ActorStopped::TYPE => ActorStopped::try_from(value).map(Event::ActorStopped),
ActorsStopped::TYPE => ActorsStopped::try_from(value).map(Event::ActorsStopped),
ProviderStarted::TYPE => ProviderStarted::try_from(value).map(Event::ProviderStarted),
ProviderStopped::TYPE => ProviderStopped::try_from(value).map(Event::ProviderStopped),
ProviderStartFailed::TYPE => {
ProviderStartFailed::try_from(value).map(Event::ProviderStartFailed)
}
ProviderHealthCheckPassed::TYPE => {
ProviderHealthCheckPassed::try_from(value).map(Event::ProviderHealthCheckPassed)
}
ProviderHealthCheckFailed::TYPE => {
ProviderHealthCheckFailed::try_from(value).map(Event::ProviderHealthCheckFailed)
}
ProviderHealthCheckStatus::TYPE => {
ProviderHealthCheckStatus::try_from(value).map(Event::ProviderHealthCheckStatus)
}
HostStarted::TYPE => HostStarted::try_from(value).map(Event::HostStarted),
HostStopped::TYPE => HostStopped::try_from(value).map(Event::HostStopped),
HostHeartbeat::TYPE => HostHeartbeat::try_from(value).map(Event::HostHeartbeat),
LinkdefSet::TYPE => LinkdefSet::try_from(value).map(Event::LinkdefSet),
LinkdefDeleted::TYPE => LinkdefDeleted::try_from(value).map(Event::LinkdefDeleted),
ManifestPublished::TYPE => {
ManifestPublished::try_from(value).map(Event::ManifestPublished)
}
ManifestUnpublished::TYPE => {
ManifestUnpublished::try_from(value).map(Event::ManifestUnpublished)
}
_ => Err(ConversionError::WrongEvent(value)),
}
}
}
impl TryFrom<Event> for CloudEvent {
type Error = anyhow::Error;
fn try_from(value: Event) -> Result<Self, Self::Error> {
let ty = match value {
Event::ActorStarted(_) => ActorStarted::TYPE,
Event::ActorsStarted(_) => ActorsStarted::TYPE,
Event::ActorsStartFailed(_) => ActorsStartFailed::TYPE,
Event::ActorStopped(_) => ActorStopped::TYPE,
Event::ActorsStopped(_) => ActorsStopped::TYPE,
Event::ProviderStarted(_) => ProviderStarted::TYPE,
Event::ProviderStopped(_) => ProviderStopped::TYPE,
Event::ProviderStartFailed(_) => ProviderStartFailed::TYPE,
Event::ProviderHealthCheckPassed(_) => ProviderHealthCheckPassed::TYPE,
Event::ProviderHealthCheckFailed(_) => ProviderHealthCheckFailed::TYPE,
Event::ProviderHealthCheckStatus(_) => ProviderHealthCheckStatus::TYPE,
Event::HostStarted(_) => HostStarted::TYPE,
Event::HostStopped(_) => HostStopped::TYPE,
Event::HostHeartbeat(_) => HostHeartbeat::TYPE,
Event::LinkdefSet(_) => LinkdefSet::TYPE,
Event::LinkdefDeleted(_) => LinkdefDeleted::TYPE,
Event::ManifestPublished(_) => ManifestPublished::TYPE,
Event::ManifestUnpublished(_) => ManifestUnpublished::TYPE,
};
EventBuilderV10::new()
.id(uuid::Uuid::new_v4().to_string())
.source(WADM_SOURCE)
.time(chrono::Utc::now())
.data("application/json", serde_json::to_value(value)?)
.ty(ty)
.build()
.map_err(anyhow::Error::from)
}
}
impl Serialize for Event {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
Event::ActorStarted(evt) => evt.serialize(serializer),
Event::ActorsStarted(evt) => evt.serialize(serializer),
Event::ActorsStartFailed(evt) => evt.serialize(serializer),
Event::ActorStopped(evt) => evt.serialize(serializer),
Event::ActorsStopped(evt) => evt.serialize(serializer),
Event::ProviderStarted(evt) => evt.serialize(serializer),
Event::ProviderStopped(evt) => evt.serialize(serializer),
Event::ProviderStartFailed(evt) => evt.serialize(serializer),
Event::ProviderHealthCheckPassed(evt) => evt.serialize(serializer),
Event::ProviderHealthCheckFailed(evt) => evt.serialize(serializer),
Event::ProviderHealthCheckStatus(evt) => evt.serialize(serializer),
Event::HostStarted(evt) => evt.serialize(serializer),
Event::HostStopped(evt) => evt.serialize(serializer),
Event::HostHeartbeat(evt) => evt.serialize(serializer),
Event::LinkdefSet(evt) => evt.serialize(serializer),
Event::LinkdefDeleted(evt) => evt.serialize(serializer),
Event::ManifestPublished(evt) => evt.serialize(serializer),
Event::ManifestUnpublished(evt) => evt.serialize(serializer),
}
}
}
impl Event {
pub fn new(evt: CloudEvent) -> Result<Event, ConversionError> {
Event::try_from(evt)
}
pub fn raw_type(&self) -> &str {
match self {
Event::ActorStarted(_) => ActorStarted::TYPE,
Event::ActorsStarted(_) => ActorsStarted::TYPE,
Event::ActorsStartFailed(_) => ActorsStartFailed::TYPE,
Event::ActorStopped(_) => ActorStopped::TYPE,
Event::ActorsStopped(_) => ActorsStopped::TYPE,
Event::ProviderStarted(_) => ProviderStarted::TYPE,
Event::ProviderStopped(_) => ProviderStopped::TYPE,
Event::ProviderStartFailed(_) => ProviderStopped::TYPE,
Event::ProviderHealthCheckPassed(_) => ProviderHealthCheckPassed::TYPE,
Event::ProviderHealthCheckFailed(_) => ProviderHealthCheckFailed::TYPE,
Event::ProviderHealthCheckStatus(_) => ProviderHealthCheckStatus::TYPE,
Event::HostStarted(_) => HostStarted::TYPE,
Event::HostStopped(_) => HostStopped::TYPE,
Event::HostHeartbeat(_) => HostHeartbeat::TYPE,
Event::LinkdefSet(_) => LinkdefSet::TYPE,
Event::LinkdefDeleted(_) => LinkdefDeleted::TYPE,
Event::ManifestPublished(_) => ManifestPublished::TYPE,
Event::ManifestUnpublished(_) => ManifestUnpublished::TYPE,
}
}
}
#[derive(Debug, Error)]
pub enum ConversionError {
#[error("Wrong event type")]
WrongEvent(CloudEvent),
#[error("No data found")]
NoData,
#[error("Error when deserializing: {0}")]
Deser(#[from] serde_json::Error),
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ActorStarted {
pub annotations: HashMap<String, String>,
pub claims: ActorClaims,
pub image_ref: String,
pub instance_id: String,
pub public_key: String,
#[serde(default)]
pub host_id: String,
}
event_impl!(
ActorStarted,
"com.wasmcloud.lattice.actor_started",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ActorsStarted {
pub annotations: HashMap<String, String>,
pub claims: ActorClaims,
pub image_ref: String,
pub count: usize,
pub public_key: String,
#[serde(default)]
pub host_id: String,
}
event_impl!(
ActorsStarted,
"com.wasmcloud.lattice.actors_started",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ActorsStartFailed {
pub annotations: HashMap<String, String>,
pub image_ref: String,
pub public_key: String,
#[serde(default)]
pub host_id: String,
pub error: String,
}
event_impl!(
ActorsStartFailed,
"com.wasmcloud.lattice.actors_start_failed",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ActorStopped {
#[serde(default)]
pub annotations: HashMap<String, String>,
pub instance_id: String,
pub public_key: String,
#[serde(default)]
pub host_id: String,
}
event_impl!(
ActorStopped,
"com.wasmcloud.lattice.actor_stopped",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ActorsStopped {
#[serde(default)]
pub annotations: HashMap<String, String>,
pub public_key: String,
#[serde(default)]
pub host_id: String,
pub count: usize,
pub remaining: usize,
}
event_impl!(
ActorsStopped,
"com.wasmcloud.lattice.actors_stopped",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ProviderStarted {
pub annotations: HashMap<String, String>,
pub claims: ProviderClaims,
pub contract_id: String,
pub image_ref: String,
pub instance_id: String,
pub link_name: String,
pub public_key: String,
#[serde(default)]
pub host_id: String,
}
event_impl!(
ProviderStarted,
"com.wasmcloud.lattice.provider_started",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ProviderStartFailed {
pub error: String,
pub link_name: String,
pub provider_ref: String,
#[serde(default)]
pub host_id: String,
}
event_impl!(
ProviderStartFailed,
"com.wasmcloud.lattice.provider_start_failed",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ProviderStopped {
#[serde(default)]
#[serde(rename = "annotaions")]
pub annotations: HashMap<String, String>,
pub contract_id: String,
pub instance_id: String,
pub link_name: String,
pub public_key: String,
pub reason: String,
#[serde(default)]
pub host_id: String,
}
event_impl!(
ProviderStopped,
"com.wasmcloud.lattice.provider_stopped",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ProviderHealthCheckPassed {
#[serde(flatten)]
pub data: ProviderHealthCheckInfo,
#[serde(default)]
pub host_id: String,
}
event_impl!(
ProviderHealthCheckPassed,
"com.wasmcloud.lattice.health_check_passed",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ProviderHealthCheckFailed {
#[serde(flatten)]
pub data: ProviderHealthCheckInfo,
#[serde(default)]
pub host_id: String,
}
event_impl!(
ProviderHealthCheckFailed,
"com.wasmcloud.lattice.health_check_failed",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ProviderHealthCheckStatus {
#[serde(flatten)]
pub data: ProviderHealthCheckInfo,
#[serde(default)]
pub host_id: String,
}
event_impl!(
ProviderHealthCheckStatus,
"com.wasmcloud.lattice.health_check_status",
source,
host_id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct LinkdefSet {
#[serde(flatten)]
pub linkdef: Linkdef,
}
event_impl!(LinkdefSet, "com.wasmcloud.lattice.linkdef_set");
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct LinkdefDeleted {
#[serde(flatten)]
pub linkdef: Linkdef,
}
event_impl!(LinkdefDeleted, "com.wasmcloud.lattice.linkdef_deleted");
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct HostStarted {
pub labels: HashMap<String, String>,
pub friendly_name: String,
#[serde(default)]
pub id: String,
}
event_impl!(
HostStarted,
"com.wasmcloud.lattice.host_started",
source,
id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct HostStopped {
pub labels: HashMap<String, String>,
#[serde(default)]
pub id: String,
}
event_impl!(
HostStopped,
"com.wasmcloud.lattice.host_stopped",
source,
id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct HostHeartbeat {
pub actors: HashMap<String, usize>,
pub friendly_name: String,
pub labels: HashMap<String, String>,
#[serde(default)]
pub annotations: HashMap<String, String>,
pub providers: Vec<ProviderInfo>,
pub uptime_human: String,
pub uptime_seconds: usize,
pub version: semver::Version,
#[serde(default)]
pub id: String,
}
event_impl!(
HostHeartbeat,
"com.wasmcloud.lattice.host_heartbeat",
source,
id
);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ManifestPublished {
#[serde(flatten)]
pub manifest: Manifest,
}
event_impl!(ManifestPublished, "com.wadm.manifest_published");
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ManifestUnpublished {
pub name: String,
}
event_impl!(ManifestUnpublished, "com.wadm.manifest_unpublished");
#[cfg(test)]
mod test {
use super::*;
const NON_SUPPORTED_EVENT: &str = r#"
{
"data": {
"oci_url": "wasmcloud.azurecr.io/httpserver:0.16.0",
"public_key": "VAG3QITQQ2ODAOWB5TTQSDJ53XK3SHBEIFNK4AYJ5RKAX2UNSCAPHA5M"
},
"datacontenttype": "application/json",
"id": "2435a9d8-8ff9-4715-8d21-2f0dc128ec48",
"source": "NB6PMW4RGLBP3NAVUVO2IH34VFJFSX7LF7TJOQCDU4GGUGF3P57SZLPX",
"specversion": "1.0",
"time": "2023-02-14T19:21:09.018468Z",
"type": "com.wasmcloud.lattice.refmap_set"
}
"#;
#[test]
fn test_non_supported_event() {
let raw: cloudevents::Event = serde_json::from_str(NON_SUPPORTED_EVENT).unwrap();
let err = Event::new(raw).expect_err("Should have errored on a non-supported event");
assert!(
matches!(err, ConversionError::WrongEvent(_)),
"Should have returned wrong event error"
);
}
#[test]
fn test_all_supported_events() {
let raw = std::fs::read("./test/data/events.json").expect("Unable to load test data");
let all_events: Vec<cloudevents::Event> = serde_json::from_slice(&raw).unwrap();
for evt in all_events.into_iter() {
println!("EVT {:?}", evt);
Event::new(evt).expect("Should be able to parse event");
}
}
}