use evidentsource_core::domain::{
Event, EventData, ExtensionValue, ProspectiveEvent, Transaction, TransactionSummary,
};
use crate::com::evidentsource as proto;
use crate::io::cloudevents::v1 as proto_ce;
use super::error::ConversionError;
impl From<Event> for proto_ce::CloudEvent {
fn from(event: Event) -> Self {
use proto_ce::cloud_event::cloud_event_attribute_value::Attr;
use proto_ce::cloud_event::CloudEventAttributeValue;
let mut attributes = std::collections::HashMap::new();
if let Some(subject) = event.subject {
attributes.insert(
"subject".to_string(),
CloudEventAttributeValue {
attr: Some(Attr::CeString(subject)),
},
);
}
if let Some(time) = event.time {
attributes.insert(
"time".to_string(),
CloudEventAttributeValue {
attr: Some(Attr::CeTimestamp(prost_types::Timestamp {
seconds: time.timestamp(),
nanos: time.timestamp_subsec_nanos() as i32,
})),
},
);
}
if let Some(datacontenttype) = event.datacontenttype {
attributes.insert(
"datacontenttype".to_string(),
CloudEventAttributeValue {
attr: Some(Attr::CeString(datacontenttype)),
},
);
}
if let Some(dataschema) = event.dataschema {
attributes.insert(
"dataschema".to_string(),
CloudEventAttributeValue {
attr: Some(Attr::CeUri(dataschema)),
},
);
}
for (key, value) in event.extensions {
let attr = match value {
ExtensionValue::String(s) => Some(Attr::CeString(s)),
ExtensionValue::Boolean(b) => Some(Attr::CeBoolean(b)),
ExtensionValue::Integer(i) => Some(Attr::CeInteger(i as i32)),
};
if let Some(attr) = attr {
attributes.insert(key, CloudEventAttributeValue { attr: Some(attr) });
}
}
let data = event.data.map(|d| match d {
EventData::Binary(bytes) => proto_ce::cloud_event::Data::BinaryData(bytes),
EventData::String(s) => proto_ce::cloud_event::Data::TextData(s),
});
proto_ce::CloudEvent {
spec_version: "1.0".to_string(),
id: event.id,
source: event.source,
r#type: event.event_type,
attributes,
data,
}
}
}
impl From<ProspectiveEvent> for proto_ce::CloudEvent {
fn from(event: ProspectiveEvent) -> Self {
use proto_ce::cloud_event::cloud_event_attribute_value::Attr;
use proto_ce::cloud_event::CloudEventAttributeValue;
let mut attributes = std::collections::HashMap::new();
if let Some(subject) = event.subject {
attributes.insert(
"subject".to_string(),
CloudEventAttributeValue {
attr: Some(Attr::CeString(subject)),
},
);
}
if let Some(time) = event.time {
attributes.insert(
"time".to_string(),
CloudEventAttributeValue {
attr: Some(Attr::CeTimestamp(prost_types::Timestamp {
seconds: time.timestamp(),
nanos: time.timestamp_subsec_nanos() as i32,
})),
},
);
}
if let Some(datacontenttype) = event.datacontenttype {
attributes.insert(
"datacontenttype".to_string(),
CloudEventAttributeValue {
attr: Some(Attr::CeString(datacontenttype)),
},
);
}
if let Some(dataschema) = event.dataschema {
attributes.insert(
"dataschema".to_string(),
CloudEventAttributeValue {
attr: Some(Attr::CeUri(dataschema)),
},
);
}
for (key, value) in event.extensions {
let attr = match value {
ExtensionValue::String(s) => Some(Attr::CeString(s)),
ExtensionValue::Boolean(b) => Some(Attr::CeBoolean(b)),
ExtensionValue::Integer(i) => Some(Attr::CeInteger(i as i32)),
};
if let Some(attr) = attr {
attributes.insert(key, CloudEventAttributeValue { attr: Some(attr) });
}
}
let data = event.data.map(|d| match d {
EventData::Binary(bytes) => proto_ce::cloud_event::Data::BinaryData(bytes),
EventData::String(s) => proto_ce::cloud_event::Data::TextData(s),
});
proto_ce::CloudEvent {
spec_version: "1.0".to_string(),
id: event.id,
source: event.stream, r#type: event.event_type,
attributes,
data,
}
}
}
impl TryFrom<proto_ce::CloudEvent> for Event {
type Error = ConversionError;
fn try_from(proto: proto_ce::CloudEvent) -> Result<Self, Self::Error> {
use proto_ce::cloud_event::cloud_event_attribute_value::Attr;
let mut subject = None;
let mut time = None;
let mut datacontenttype = None;
let mut dataschema = None;
let mut extensions = Vec::new();
for (key, value) in proto.attributes {
if let Some(attr) = value.attr {
match (key.as_str(), attr) {
("subject", Attr::CeString(s)) => {
subject = Some(s);
}
("time", Attr::CeTimestamp(ts)) => {
time = chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32);
}
("datacontenttype", Attr::CeString(s)) => {
datacontenttype = Some(s);
}
("dataschema", Attr::CeUri(s)) => {
dataschema = Some(s);
}
(key, Attr::CeString(s)) => {
extensions.push((key.to_string(), ExtensionValue::String(s)));
}
(key, Attr::CeBoolean(b)) => {
extensions.push((key.to_string(), ExtensionValue::Boolean(b)));
}
(key, Attr::CeInteger(i)) => {
extensions.push((key.to_string(), ExtensionValue::Integer(i as i64)));
}
_ => {}
}
}
}
let data = proto.data.map(|d| match d {
proto_ce::cloud_event::Data::BinaryData(bytes) => EventData::Binary(bytes),
proto_ce::cloud_event::Data::TextData(text) => EventData::String(text),
proto_ce::cloud_event::Data::ProtoData(_) => {
EventData::Binary(vec![])
}
});
Ok(Event {
id: proto.id,
source: proto.source,
event_type: proto.r#type,
subject,
data,
time,
datacontenttype,
dataschema,
extensions,
})
}
}
impl TryFrom<proto::Transaction> for Transaction {
type Error = ConversionError;
fn try_from(proto: proto::Transaction) -> Result<Self, Self::Error> {
let event_count = proto.events.len();
let events: Result<Vec<Event>, _> = proto.events.into_iter().map(Event::try_from).collect();
Ok(Transaction {
events: events?,
summary: TransactionSummary {
transaction_id: if proto.id.is_empty() {
None
} else {
Some(proto.id)
},
revision: proto.basis, event_count,
},
})
}
}
impl From<proto::TransactionSummary> for TransactionSummary {
fn from(proto: proto::TransactionSummary) -> Self {
TransactionSummary {
transaction_id: if proto.id.is_empty() {
None
} else {
Some(proto.id)
},
revision: proto.revision,
event_count: 0, }
}
}
#[cfg(feature = "cloudevents")]
use cloudevents::{AttributesReader, Data, EventBuilder, EventBuilderV10};
#[cfg(feature = "cloudevents")]
pub fn event_to_cloudevent(event: Event) -> cloudevents::Event {
let mut builder = EventBuilderV10::new()
.id(event.id)
.source(event.source)
.ty(event.event_type);
if let Some(subject) = event.subject {
builder = builder.subject(subject);
}
if let Some(time) = event.time {
builder = builder.time(time);
}
for (key, value) in event.extensions {
builder = match value {
ExtensionValue::String(s) => builder.extension(&key, s),
ExtensionValue::Boolean(b) => builder.extension(&key, b),
ExtensionValue::Integer(i) => builder.extension(&key, i),
};
}
if let Some(data) = event.data {
let content_type = event
.datacontenttype
.unwrap_or_else(|| "application/octet-stream".to_string());
builder = match data {
EventData::Binary(bytes) => builder.data(content_type, bytes),
EventData::String(s) => builder.data(content_type, s),
};
}
builder
.build()
.expect("Failed to build CloudEvent from Event")
}
#[cfg(feature = "cloudevents")]
pub fn prospective_event_to_cloudevent(event: ProspectiveEvent) -> cloudevents::Event {
let mut builder = EventBuilderV10::new()
.id(event.id)
.source(event.stream) .ty(event.event_type);
if let Some(subject) = event.subject {
builder = builder.subject(subject);
}
if let Some(time) = event.time {
builder = builder.time(time);
}
for (key, value) in event.extensions {
builder = match value {
ExtensionValue::String(s) => builder.extension(&key, s),
ExtensionValue::Boolean(b) => builder.extension(&key, b),
ExtensionValue::Integer(i) => builder.extension(&key, i),
};
}
if let Some(data) = event.data {
let content_type = event
.datacontenttype
.unwrap_or_else(|| "application/octet-stream".to_string());
builder = match data {
EventData::Binary(bytes) => builder.data(content_type, bytes),
EventData::String(s) => builder.data(content_type, s),
};
}
builder
.build()
.expect("Failed to build CloudEvent from ProspectiveEvent")
}
#[cfg(feature = "cloudevents")]
pub fn cloudevent_to_event(ce: cloudevents::Event) -> Result<Event, ConversionError> {
let mut extensions = Vec::new();
for (key, value) in ce.iter_extensions() {
let ext_value = match value {
cloudevents::event::ExtensionValue::String(s) => ExtensionValue::String(s.to_string()),
cloudevents::event::ExtensionValue::Boolean(b) => ExtensionValue::Boolean(*b),
cloudevents::event::ExtensionValue::Integer(i) => ExtensionValue::Integer(*i),
};
extensions.push((key.to_string(), ext_value));
}
let (data, datacontenttype) = match ce.data() {
Some(Data::Binary(bytes)) => (
Some(EventData::Binary(bytes.clone())),
ce.datacontenttype().map(|s| s.to_string()),
),
Some(Data::String(s)) => (
Some(EventData::String(s.clone())),
ce.datacontenttype().map(|s| s.to_string()),
),
Some(Data::Json(v)) => (
Some(EventData::String(v.to_string())),
Some("application/json".to_string()),
),
None => (None, ce.datacontenttype().map(|s| s.to_string())),
};
Ok(Event {
id: ce.id().to_string(),
source: ce.source().to_string(),
event_type: ce.ty().to_string(),
subject: ce.subject().map(|s| s.to_string()),
data,
time: ce.time().copied(),
datacontenttype,
dataschema: ce.dataschema().map(|u| u.to_string()),
extensions,
})
}
#[cfg(feature = "cloudevents")]
pub fn cloudevent_to_prospective_event(
ce: cloudevents::Event,
) -> Result<ProspectiveEvent, ConversionError> {
let mut extensions = Vec::new();
for (key, value) in ce.iter_extensions() {
let ext_value = match value {
cloudevents::event::ExtensionValue::String(s) => ExtensionValue::String(s.to_string()),
cloudevents::event::ExtensionValue::Boolean(b) => ExtensionValue::Boolean(*b),
cloudevents::event::ExtensionValue::Integer(i) => ExtensionValue::Integer(*i),
};
extensions.push((key.to_string(), ext_value));
}
let (data, datacontenttype) = match ce.data() {
Some(Data::Binary(bytes)) => (
Some(EventData::Binary(bytes.clone())),
ce.datacontenttype().map(|s| s.to_string()),
),
Some(Data::String(s)) => (
Some(EventData::String(s.clone())),
ce.datacontenttype().map(|s| s.to_string()),
),
Some(Data::Json(v)) => (
Some(EventData::String(v.to_string())),
Some("application/json".to_string()),
),
None => (None, ce.datacontenttype().map(|s| s.to_string())),
};
Ok(ProspectiveEvent {
id: ce.id().to_string(),
stream: ce.source().to_string(), event_type: ce.ty().to_string(),
subject: ce.subject().map(|s| s.to_string()),
data,
time: ce.time().copied(),
datacontenttype,
dataschema: ce.dataschema().map(|u| u.to_string()),
extensions,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_to_proto_roundtrip() {
let event = Event {
id: "test-id".to_string(),
source: "https://test.local/db/test/streams/test-stream".to_string(),
event_type: "test.type".to_string(),
subject: Some("test-subject".to_string()),
data: Some(EventData::String(r#"{"key": "value"}"#.to_string())),
time: None,
datacontenttype: Some("application/json".to_string()),
dataschema: None,
extensions: vec![],
};
let proto: proto_ce::CloudEvent = event.clone().into();
let back: Event = proto.try_into().unwrap();
assert_eq!(event.id, back.id);
assert_eq!(event.source, back.source);
assert_eq!(event.event_type, back.event_type);
assert_eq!(event.subject, back.subject);
}
#[test]
fn test_prospective_event_to_proto() {
let event = ProspectiveEvent {
id: "test-id".to_string(),
stream: "test-stream".to_string(),
event_type: "test.type".to_string(),
subject: Some("test-subject".to_string()),
data: Some(EventData::String(r#"{"key": "value"}"#.to_string())),
time: None,
datacontenttype: Some("application/json".to_string()),
dataschema: None,
extensions: vec![],
};
let proto: proto_ce::CloudEvent = event.into();
assert_eq!(proto.id, "test-id");
assert_eq!(proto.source, "test-stream"); assert_eq!(proto.r#type, "test.type");
}
#[cfg(feature = "cloudevents")]
mod cloudevents_tests {
use super::*;
use cloudevents::{AttributesReader, EventBuilder, EventBuilderV10};
#[test]
fn test_event_to_cloudevents_sdk() {
let event = Event {
id: "test-id".to_string(),
source: "https://test.local/db/test/streams/test-stream".to_string(),
event_type: "test.type".to_string(),
subject: Some("test-subject".to_string()),
data: Some(EventData::String(r#"{"key": "value"}"#.to_string())),
time: None,
datacontenttype: Some("application/json".to_string()),
dataschema: None,
extensions: vec![],
};
let ce = event_to_cloudevent(event);
assert_eq!(ce.id(), "test-id");
assert_eq!(
ce.source(),
"https://test.local/db/test/streams/test-stream"
);
assert_eq!(ce.ty(), "test.type");
assert_eq!(ce.subject(), Some("test-subject"));
}
#[test]
fn test_cloudevents_sdk_to_event() {
let ce = EventBuilderV10::new()
.id("test-id")
.source("https://test.local/db/test/streams/test-stream")
.ty("test.type")
.subject("test-subject")
.data("application/json", r#"{"key": "value"}"#)
.build()
.unwrap();
let event = cloudevent_to_event(ce).unwrap();
assert_eq!(event.id, "test-id");
assert_eq!(
event.source,
"https://test.local/db/test/streams/test-stream"
);
assert_eq!(event.event_type, "test.type");
assert_eq!(event.subject, Some("test-subject".to_string()));
}
}
}