1use crate::{Event, EventData, EventMetadata, Result, EventualiError};
2use crate::aggregate::AggregateSnapshot;
3use prost::Message;
4use uuid::Uuid;
5use chrono::{DateTime, Utc};
6
7pub mod eventuali {
9 include!(concat!(env!("OUT_DIR"), "/eventuali.rs"));
10}
11
12pub struct ProtoSerializer;
13
14impl ProtoSerializer {
15 pub fn new() -> Self {
16 Self
17 }
18
19 pub fn serialize_event(&self, event: &Event) -> Result<Vec<u8>> {
21 let proto_event = self.event_to_proto(event)?;
22 let mut buf = Vec::new();
23 proto_event.encode(&mut buf)
24 .map_err(|e| EventualiError::InvalidEventData(e.to_string()))?;
25 Ok(buf)
26 }
27
28 pub fn deserialize_event(&self, data: &[u8]) -> Result<Event> {
30 let proto_event = eventuali::Event::decode(data)
31 .map_err(|e| EventualiError::Protobuf(prost::DecodeError::new(e.to_string())))?;
32 self.proto_to_event(proto_event)
33 }
34
35 pub fn serialize_snapshot(&self, snapshot: &AggregateSnapshot) -> Result<Vec<u8>> {
37 let data_bytes = serde_json::to_vec(&snapshot.data)
38 .map_err(EventualiError::Serialization)?;
39
40 let proto_snapshot = eventuali::AggregateSnapshot {
41 aggregate_id: snapshot.aggregate_id.clone(),
42 aggregate_type: snapshot.aggregate_type.clone(),
43 version: snapshot.version,
44 data: data_bytes,
45 timestamp: snapshot.timestamp.timestamp(),
46 };
47
48 let mut buf = Vec::new();
49 proto_snapshot.encode(&mut buf)
50 .map_err(|e| EventualiError::InvalidEventData(e.to_string()))?;
51 Ok(buf)
52 }
53
54 pub fn deserialize_snapshot(&self, data: &[u8]) -> Result<AggregateSnapshot> {
56 let proto_snapshot = eventuali::AggregateSnapshot::decode(data)
57 .map_err(|e| EventualiError::Protobuf(prost::DecodeError::new(e.to_string())))?;
58
59 let data = if !proto_snapshot.data.is_empty() {
60 serde_json::from_slice(&proto_snapshot.data)
61 .map_err(EventualiError::Serialization)?
62 } else {
63 serde_json::json!({})
64 };
65
66 let timestamp = DateTime::from_timestamp(proto_snapshot.timestamp, 0)
67 .unwrap_or_else(Utc::now)
68 .with_timezone(&Utc);
69
70 Ok(AggregateSnapshot {
71 aggregate_id: proto_snapshot.aggregate_id,
72 aggregate_type: proto_snapshot.aggregate_type,
73 version: proto_snapshot.version,
74 data,
75 timestamp,
76 })
77 }
78
79 fn event_to_proto(&self, event: &Event) -> Result<eventuali::Event> {
80 let data_bytes = match &event.data {
81 EventData::Json(json) => {
82 serde_json::to_vec(json)
83 .map_err(EventualiError::Serialization)?
84 },
85 EventData::Protobuf(bytes) => bytes.clone(),
86 };
87
88 let metadata = eventuali::EventMetadata {
89 causation_id: event.metadata.causation_id.map(|id| id.to_string()).unwrap_or_default(),
90 correlation_id: event.metadata.correlation_id.map(|id| id.to_string()).unwrap_or_default(),
91 user_id: event.metadata.user_id.clone().unwrap_or_default(),
92 headers: event.metadata.headers.clone(),
93 };
94
95 Ok(eventuali::Event {
96 id: event.id.to_string(),
97 aggregate_id: event.aggregate_id.clone(),
98 aggregate_type: event.aggregate_type.clone(),
99 event_type: event.event_type.clone(),
100 event_version: event.event_version,
101 aggregate_version: event.aggregate_version,
102 data: data_bytes,
103 metadata: Some(metadata),
104 timestamp: event.timestamp.timestamp(),
105 })
106 }
107
108 fn proto_to_event(&self, proto_event: eventuali::Event) -> Result<Event> {
109 let id = Uuid::parse_str(&proto_event.id)
110 .map_err(|_| EventualiError::InvalidEventData("Invalid UUID".to_string()))?;
111
112 let data = if !proto_event.data.is_empty() {
113 EventData::Protobuf(proto_event.data)
114 } else {
115 EventData::Json(serde_json::json!({}))
116 };
117
118 let metadata = if let Some(meta) = proto_event.metadata {
119 EventMetadata {
120 causation_id: if meta.causation_id.is_empty() {
121 None
122 } else {
123 Uuid::parse_str(&meta.causation_id).ok()
124 },
125 correlation_id: if meta.correlation_id.is_empty() {
126 None
127 } else {
128 Uuid::parse_str(&meta.correlation_id).ok()
129 },
130 user_id: if meta.user_id.is_empty() { None } else { Some(meta.user_id) },
131 headers: meta.headers,
132 }
133 } else {
134 EventMetadata::default()
135 };
136
137 let timestamp = DateTime::from_timestamp(proto_event.timestamp, 0)
138 .unwrap_or_else(Utc::now)
139 .with_timezone(&Utc);
140
141 Ok(Event {
142 id,
143 aggregate_id: proto_event.aggregate_id,
144 aggregate_type: proto_event.aggregate_type,
145 event_type: proto_event.event_type,
146 event_version: proto_event.event_version,
147 aggregate_version: proto_event.aggregate_version,
148 data,
149 metadata,
150 timestamp,
151 })
152 }
153}
154
155impl Default for ProtoSerializer {
156 fn default() -> Self {
157 Self::new()
158 }
159}
160
161impl ProtoSerializer {
163 pub fn create_user_registered(name: String, email: String) -> eventuali::UserRegistered {
165 eventuali::UserRegistered { name, email }
166 }
167
168 pub fn create_user_email_changed(old_email: String, new_email: String) -> eventuali::UserEmailChanged {
170 eventuali::UserEmailChanged { old_email, new_email }
171 }
172
173 pub fn create_user_deactivated(reason: String) -> eventuali::UserDeactivated {
175 eventuali::UserDeactivated { reason }
176 }
177
178 pub fn create_order_placed(
180 customer_id: String,
181 items: Vec<eventuali::OrderItem>,
182 total_amount: f64
183 ) -> eventuali::OrderPlaced {
184 eventuali::OrderPlaced { customer_id, items, total_amount }
185 }
186}