use serde_json::Value;
use umadb_dcb::DcbEvent;
use uuid::Uuid;
use crate::{
domain_id::{DomainIdValue, DomainIdValues},
error::SerializationError,
event::{Event, EventEnvelope, StoredEventData},
};
#[derive(Debug, Default)]
pub struct Emit {
events: Vec<EmittedEvent>,
}
#[derive(Debug)]
pub struct EmittedEvent {
pub event_type: String,
pub data: Value,
pub domain_ids: DomainIdValues,
}
impl Emit {
pub fn new() -> Self {
Self { events: Vec::new() }
}
pub fn event<E: Event>(mut self, event: E) -> Self {
let emitted = EmittedEvent::new(event);
self.events.push(emitted);
self
}
pub fn try_event<E: Event>(mut self, event: E) -> Result<Self, SerializationError> {
let domain_ids = event.domain_ids();
let emitted = EmittedEvent {
event_type: E::EVENT_TYPE.to_string(),
data: serde_json::to_value(event)?,
domain_ids,
};
self.events.push(emitted);
Ok(self)
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn into_events(self) -> Vec<EmittedEvent> {
self.events
}
pub fn events(&self) -> &[EmittedEvent] {
&self.events
}
pub fn contains_event_type<E: Event>(&self) -> bool {
self.events
.iter()
.any(|event| event.event_type == E::EVENT_TYPE)
}
}
impl EmittedEvent {
pub fn new<E: Event>(event: E) -> Self {
let domain_ids = event.domain_ids();
EmittedEvent {
event_type: E::EVENT_TYPE.to_string(),
data: serde_json::to_value(event).expect("event serialization failed"),
domain_ids,
}
}
pub fn into_dcb_event(self, envelope: EventEnvelope) -> DcbEvent {
DcbEvent {
event_type: self.event_type,
tags: self
.domain_ids
.into_iter()
.filter_map(|(category, id)| {
assert!(
!category.contains(':'),
"domain id categories cannot contain a colon character"
);
match id {
DomainIdValue::Value(id) => Some(format!("{category}:{id}")),
DomainIdValue::None => None,
}
})
.collect(),
data: encode_with_envelope(envelope, self.data),
uuid: Some(Uuid::new_v4()),
}
}
}
pub fn encode_with_envelope(envelope: EventEnvelope, data: Value) -> Vec<u8> {
serde_json::to_vec(&StoredEventData {
timestamp: envelope.timestamp,
correlation_id: envelope.correlation_id,
causation_id: envelope.causation_id,
triggering_event_id: envelope.triggering_event_id,
idempotency_key: envelope.idempotency_key,
data,
})
.unwrap()
}