use crate::event::{Event, EventMetadata, EventPriority, HasPriority};
use std::any::{Any, TypeId};
use std::fmt;
use std::sync::Arc;
#[derive(Clone)]
pub struct EventEnvelope {
payload: Option<Arc<dyn Any + Send + Sync>>,
payload_bytes: Option<Vec<u8>>,
serializer: fn(&Arc<dyn Any + Send + Sync>) -> crate::Result<Vec<u8>>,
type_id: TypeId,
type_name: String,
pub metadata: EventMetadata,
pub priority: EventPriority,
}
impl EventEnvelope {
pub fn new<T: Event>(event: T) -> Self {
Self::with_metadata(event, EventMetadata::new())
}
pub fn with_metadata<T: Event>(event: T, metadata: EventMetadata) -> Self {
Self {
payload: Some(Arc::new(event)),
payload_bytes: None,
serializer: |any| {
let event = any.downcast_ref::<T>().ok_or_else(|| {
crate::Error::internal("Failed to downcast for serialization")
})?;
serde_json::to_vec(event)
.map_err(|e| crate::Error::SerializationError(e.to_string()))
},
type_id: T::type_id(),
type_name: T::event_type().to_string(),
metadata,
priority: EventPriority::default(),
}
}
pub fn with_priority<T: Event + HasPriority>(event: T, metadata: EventMetadata) -> Self {
let priority = event.priority();
Self {
payload: Some(Arc::new(event)),
payload_bytes: None,
serializer: |any| {
let event = any.downcast_ref::<T>().ok_or_else(|| {
crate::Error::internal("Failed to downcast for serialization")
})?;
serde_json::to_vec(event)
.map_err(|e| crate::Error::SerializationError(e.to_string()))
},
type_id: T::type_id(),
type_name: T::event_type().to_string(),
metadata,
priority,
}
}
pub fn from_serialized(
type_id: TypeId,
type_name: String,
metadata: EventMetadata,
priority: EventPriority,
payload_bytes: Vec<u8>,
) -> Self {
Self {
payload: None,
payload_bytes: Some(payload_bytes),
serializer: |_| {
Err(crate::Error::internal(
"Cannot serialize an already serialized event",
))
},
type_id,
type_name,
metadata,
priority,
}
}
pub fn event_type(&self) -> &str {
&self.type_name
}
pub fn type_id(&self) -> TypeId {
self.type_id
}
pub fn downcast_ref<T: Event>(&self) -> Option<&T> {
if self.type_id == TypeId::of::<T>() {
self.payload.as_ref().and_then(|p| p.downcast_ref::<T>())
} else {
None
}
}
pub fn get_event<T: Event>(&self) -> crate::Result<T> {
if self.type_id != TypeId::of::<T>() {
return Err(crate::Error::EventNotRegistered {
type_name: self.type_name.clone(),
});
}
if let Some(payload) = &self.payload {
if let Some(event) = payload.downcast_ref::<T>() {
return Ok(event.clone());
}
}
if let Some(bytes) = &self.payload_bytes {
return serde_json::from_slice(bytes)
.map_err(|e| crate::Error::SerializationError(e.to_string()));
}
Err(crate::Error::internal("Event envelope is empty"))
}
pub fn into_bytes(&self) -> crate::Result<Vec<u8>> {
if let Some(bytes) = &self.payload_bytes {
Ok(bytes.clone())
} else if let Some(payload) = &self.payload {
(self.serializer)(payload)
} else {
Err(crate::Error::internal("Event envelope is empty"))
}
}
#[allow(clippy::result_large_err)]
pub fn try_into_inner<T: Event>(self) -> Result<Arc<T>, Self> {
if self.type_id == TypeId::of::<T>() {
if let Some(payload) = self.payload.clone() {
match Arc::downcast::<T>(payload) {
Ok(event) => Ok(event),
Err(_) => Err(self),
}
} else {
Err(self)
}
} else {
Err(self)
}
}
pub fn is<T: Event>(&self) -> bool {
self.type_id == TypeId::of::<T>()
}
pub fn correlation_id(&self) -> Option<uuid::Uuid> {
self.metadata.correlation_id
}
pub fn event_id(&self) -> uuid::Uuid {
self.metadata.event_id
}
pub fn clone_payload(&self) -> Option<Arc<dyn Any + Send + Sync>> {
self.payload.clone()
}
pub fn chain<T: Event>(&self, event: T) -> Self {
let mut metadata = EventMetadata::new();
metadata.chain_from(&self.metadata);
Self::with_metadata(event, metadata)
}
}
impl fmt::Debug for EventEnvelope {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventEnvelope")
.field("type_name", &self.type_name)
.field("event_id", &self.metadata.event_id)
.field("priority", &self.priority)
.field("correlation_id", &self.metadata.correlation_id)
.finish()
}
}
#[derive(Debug)]
pub struct EnvelopeBuilder<T: Event> {
event: T,
metadata: EventMetadata,
priority: Option<EventPriority>,
}
impl<T: Event> EnvelopeBuilder<T> {
pub fn new(event: T) -> Self {
Self {
event,
metadata: EventMetadata::new(),
priority: None,
}
}
pub fn metadata(mut self, metadata: EventMetadata) -> Self {
self.metadata = metadata;
self
}
pub fn correlation_id(mut self, id: uuid::Uuid) -> Self {
self.metadata.correlation_id = Some(id);
self
}
pub fn causation_id(mut self, id: uuid::Uuid) -> Self {
self.metadata.causation_id = Some(id);
self
}
pub fn source(mut self, source: impl Into<String>) -> Self {
self.metadata.source = Some(source.into());
self
}
pub fn priority(mut self, priority: EventPriority) -> Self {
self.priority = Some(priority);
self
}
pub fn build(self) -> EventEnvelope {
let mut envelope = EventEnvelope::with_metadata(self.event, self.metadata);
if let Some(priority) = self.priority {
envelope.priority = priority;
}
envelope
}
}
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct TestEvent {
id: u64,
_message: String,
}
impl Event for TestEvent {
fn event_type() -> &'static str {
"TestEvent"
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct StringEvent(String);
impl Event for StringEvent {
fn event_type() -> &'static str {
"StringEvent"
}
}
#[test]
fn test_envelope_creation() {
let event = TestEvent {
id: 123,
_message: "test".to_string(),
};
let envelope = EventEnvelope::new(event.clone());
assert_eq!(envelope.event_type(), "TestEvent");
assert_eq!(envelope.type_id(), TypeId::of::<TestEvent>());
assert!(envelope.is::<TestEvent>());
assert!(!envelope.is::<StringEvent>());
}
#[test]
fn test_envelope_downcast() {
let event = TestEvent {
id: 456,
_message: "downcast test".to_string(),
};
let envelope = EventEnvelope::new(event);
let downcast = envelope.get_event::<TestEvent>();
assert!(downcast.is_ok());
assert_eq!(downcast.unwrap().id, 456);
let wrong_downcast = envelope.get_event::<StringEvent>();
assert!(wrong_downcast.is_err());
}
#[test]
fn test_envelope_builder() {
let correlation_id = Uuid::max();
let event = TestEvent {
id: 789,
_message: "builder test".to_string(),
};
let envelope = EnvelopeBuilder::new(event)
.correlation_id(correlation_id)
.source("test-source")
.priority(EventPriority::High)
.build();
assert_eq!(envelope.correlation_id(), Some(correlation_id));
assert_eq!(envelope.metadata.source, Some("test-source".to_string()));
assert_eq!(envelope.priority, EventPriority::High);
}
#[test]
fn test_envelope_chaining() {
let parent_event = TestEvent {
id: 1,
_message: "parent".to_string(),
};
let parent_envelope = EventEnvelope::new(parent_event);
let child_event = TestEvent {
id: 2,
_message: "child".to_string(),
};
let child_envelope = parent_envelope.chain(child_event);
assert_eq!(
child_envelope.metadata.causation_id,
Some(parent_envelope.event_id())
);
assert_eq!(
child_envelope.metadata.correlation_id,
Some(parent_envelope.event_id())
);
}
}