use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::fmt::Debug;
use uuid::Uuid;
pub trait Event: Send + Sync + Debug + 'static {
fn event_name(&self) -> &str;
fn event_id(&self) -> Uuid;
fn timestamp(&self) -> DateTime<Utc>;
fn as_any(&self) -> &dyn Any;
fn clone_event(&self) -> Box<dyn Event>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventMetadata {
pub id: Uuid,
pub name: String,
pub timestamp: DateTime<Utc>,
pub correlation_id: Option<Uuid>,
pub causation_id: Option<Uuid>,
pub metadata: serde_json::Value,
}
impl EventMetadata {
pub fn new(name: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
name: name.into(),
timestamp: Utc::now(),
correlation_id: None,
causation_id: None,
metadata: serde_json::Value::Object(serde_json::Map::new()),
}
}
pub fn with_correlation_id(mut self, id: Uuid) -> Self {
self.correlation_id = Some(id);
self
}
pub fn with_causation_id(mut self, id: Uuid) -> Self {
self.causation_id = Some(id);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DomainEvent {
#[serde(flatten)]
pub metadata: EventMetadata,
pub aggregate_id: String,
pub aggregate_type: String,
pub version: u32,
pub payload: serde_json::Value,
}
impl DomainEvent {
pub fn new(
event_name: impl Into<String>,
aggregate_id: impl Into<String>,
aggregate_type: impl Into<String>,
payload: serde_json::Value,
) -> Self {
Self {
metadata: EventMetadata::new(event_name),
aggregate_id: aggregate_id.into(),
aggregate_type: aggregate_type.into(),
version: 1,
payload,
}
}
}
#[async_trait]
pub trait EventHandler<E: Event>: Send + Sync {
async fn handle(&self, event: &E) -> Result<(), EventHandlerError>;
}
#[derive(Debug, thiserror::Error)]
pub enum EventHandlerError {
#[error("Handler failed: {0}")]
HandlerFailed(String),
#[error("Event processing error: {0}")]
ProcessingError(String),
#[error("Handler not found for event: {0}")]
HandlerNotFound(String),
}
#[async_trait]
pub trait DynEventHandler: Send + Sync {
async fn handle_dyn(&self, event: &dyn Event) -> Result<(), EventHandlerError>;
fn clone_handler(&self) -> Box<dyn DynEventHandler>;
}
pub struct TypedEventHandler<E: Event, H: EventHandler<E>> {
handler: H,
_phantom: std::marker::PhantomData<E>,
}
impl<E: Event, H: EventHandler<E>> TypedEventHandler<E, H> {
pub fn new(handler: H) -> Self {
Self {
handler,
_phantom: std::marker::PhantomData,
}
}
}
#[async_trait]
impl<E: Event + Clone, H: EventHandler<E> + Clone + 'static> DynEventHandler
for TypedEventHandler<E, H>
{
async fn handle_dyn(&self, event: &dyn Event) -> Result<(), EventHandlerError> {
if let Some(typed_event) = event.as_any().downcast_ref::<E>() {
self.handler.handle(typed_event).await
} else {
Err(EventHandlerError::HandlerFailed(
"Type mismatch".to_string(),
))
}
}
fn clone_handler(&self) -> Box<dyn DynEventHandler> {
Box::new(Self {
handler: self.handler.clone(),
_phantom: std::marker::PhantomData,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct TestEvent {
metadata: EventMetadata,
data: String,
}
impl TestEvent {
#[allow(dead_code)]
fn new(data: String) -> Self {
Self {
metadata: EventMetadata::new("test_event"),
data,
}
}
}
impl Event for TestEvent {
fn event_name(&self) -> &str {
&self.metadata.name
}
fn event_id(&self) -> Uuid {
self.metadata.id
}
fn timestamp(&self) -> DateTime<Utc> {
self.metadata.timestamp
}
fn as_any(&self) -> &dyn Any {
self
}
fn clone_event(&self) -> Box<dyn Event> {
Box::new(self.clone())
}
}
#[test]
fn test_event_metadata() {
let metadata = EventMetadata::new("test_event").with_correlation_id(Uuid::new_v4());
assert_eq!(metadata.name, "test_event");
assert!(metadata.correlation_id.is_some());
}
#[test]
fn test_domain_event() {
let event = DomainEvent::new(
"user_created",
"user-123",
"User",
serde_json::json!({"name": "Alice"}),
);
assert_eq!(event.metadata.name, "user_created");
assert_eq!(event.aggregate_id, "user-123");
assert_eq!(event.aggregate_type, "User");
}
}