eventuali_core/
proto.rs

1use crate::{Event, EventData, EventMetadata, Result, EventualiError};
2use crate::aggregate::AggregateSnapshot;
3use prost::Message;
4use uuid::Uuid;
5use chrono::{DateTime, Utc};
6
7// Include generated protobuf code
8pub mod eventuali {
9    include!(concat!(env!("OUT_DIR"), "/eventuali.rs"));
10}
11
12pub struct ProtoSerializer;
13
14impl ProtoSerializer {
15    pub fn new() -> Self {
16        Self
17    }
18
19    /// Serialize an event to Protocol Buffers format
20    pub fn serialize_event(&self, event: &Event) -> Result<Vec<u8>> {
21        let proto_event = self.event_to_proto(event)?;
22        let mut buf = Vec::new();
23        proto_event.encode(&mut buf)
24            .map_err(|e| EventualiError::InvalidEventData(e.to_string()))?;
25        Ok(buf)
26    }
27
28    /// Deserialize an event from Protocol Buffers format
29    pub fn deserialize_event(&self, data: &[u8]) -> Result<Event> {
30        let proto_event = eventuali::Event::decode(data)
31            .map_err(|e| EventualiError::Protobuf(prost::DecodeError::new(e.to_string())))?;
32        self.proto_to_event(proto_event)
33    }
34
35    /// Serialize an aggregate snapshot to Protocol Buffers format
36    pub fn serialize_snapshot(&self, snapshot: &AggregateSnapshot) -> Result<Vec<u8>> {
37        let data_bytes = serde_json::to_vec(&snapshot.data)
38            .map_err(EventualiError::Serialization)?;
39        
40        let proto_snapshot = eventuali::AggregateSnapshot {
41            aggregate_id: snapshot.aggregate_id.clone(),
42            aggregate_type: snapshot.aggregate_type.clone(),
43            version: snapshot.version,
44            data: data_bytes,
45            timestamp: snapshot.timestamp.timestamp(),
46        };
47
48        let mut buf = Vec::new();
49        proto_snapshot.encode(&mut buf)
50            .map_err(|e| EventualiError::InvalidEventData(e.to_string()))?;
51        Ok(buf)
52    }
53
54    /// Deserialize an aggregate snapshot from Protocol Buffers format
55    pub fn deserialize_snapshot(&self, data: &[u8]) -> Result<AggregateSnapshot> {
56        let proto_snapshot = eventuali::AggregateSnapshot::decode(data)
57            .map_err(|e| EventualiError::Protobuf(prost::DecodeError::new(e.to_string())))?;
58
59        let data = if !proto_snapshot.data.is_empty() {
60            serde_json::from_slice(&proto_snapshot.data)
61                .map_err(EventualiError::Serialization)?
62        } else {
63            serde_json::json!({})
64        };
65
66        let timestamp = DateTime::from_timestamp(proto_snapshot.timestamp, 0)
67            .unwrap_or_else(Utc::now)
68            .with_timezone(&Utc);
69
70        Ok(AggregateSnapshot {
71            aggregate_id: proto_snapshot.aggregate_id,
72            aggregate_type: proto_snapshot.aggregate_type,
73            version: proto_snapshot.version,
74            data,
75            timestamp,
76        })
77    }
78
79    fn event_to_proto(&self, event: &Event) -> Result<eventuali::Event> {
80        let data_bytes = match &event.data {
81            EventData::Json(json) => {
82                serde_json::to_vec(json)
83                    .map_err(EventualiError::Serialization)?
84            },
85            EventData::Protobuf(bytes) => bytes.clone(),
86        };
87
88        let metadata = eventuali::EventMetadata {
89            causation_id: event.metadata.causation_id.map(|id| id.to_string()).unwrap_or_default(),
90            correlation_id: event.metadata.correlation_id.map(|id| id.to_string()).unwrap_or_default(),
91            user_id: event.metadata.user_id.clone().unwrap_or_default(),
92            headers: event.metadata.headers.clone(),
93        };
94
95        Ok(eventuali::Event {
96            id: event.id.to_string(),
97            aggregate_id: event.aggregate_id.clone(),
98            aggregate_type: event.aggregate_type.clone(),
99            event_type: event.event_type.clone(),
100            event_version: event.event_version,
101            aggregate_version: event.aggregate_version,
102            data: data_bytes,
103            metadata: Some(metadata),
104            timestamp: event.timestamp.timestamp(),
105        })
106    }
107
108    fn proto_to_event(&self, proto_event: eventuali::Event) -> Result<Event> {
109        let id = Uuid::parse_str(&proto_event.id)
110            .map_err(|_| EventualiError::InvalidEventData("Invalid UUID".to_string()))?;
111
112        let data = if !proto_event.data.is_empty() {
113            EventData::Protobuf(proto_event.data)
114        } else {
115            EventData::Json(serde_json::json!({}))
116        };
117
118        let metadata = if let Some(meta) = proto_event.metadata {
119            EventMetadata {
120                causation_id: if meta.causation_id.is_empty() { 
121                    None 
122                } else { 
123                    Uuid::parse_str(&meta.causation_id).ok() 
124                },
125                correlation_id: if meta.correlation_id.is_empty() { 
126                    None 
127                } else { 
128                    Uuid::parse_str(&meta.correlation_id).ok() 
129                },
130                user_id: if meta.user_id.is_empty() { None } else { Some(meta.user_id) },
131                headers: meta.headers,
132            }
133        } else {
134            EventMetadata::default()
135        };
136
137        let timestamp = DateTime::from_timestamp(proto_event.timestamp, 0)
138            .unwrap_or_else(Utc::now)
139            .with_timezone(&Utc);
140
141        Ok(Event {
142            id,
143            aggregate_id: proto_event.aggregate_id,
144            aggregate_type: proto_event.aggregate_type,
145            event_type: proto_event.event_type,
146            event_version: proto_event.event_version,
147            aggregate_version: proto_event.aggregate_version,
148            data,
149            metadata,
150            timestamp,
151        })
152    }
153}
154
155impl Default for ProtoSerializer {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161// Utility functions for specific event types
162impl ProtoSerializer {
163    /// Create a UserRegistered event
164    pub fn create_user_registered(name: String, email: String) -> eventuali::UserRegistered {
165        eventuali::UserRegistered { name, email }
166    }
167
168    /// Create a UserEmailChanged event
169    pub fn create_user_email_changed(old_email: String, new_email: String) -> eventuali::UserEmailChanged {
170        eventuali::UserEmailChanged { old_email, new_email }
171    }
172
173    /// Create a UserDeactivated event
174    pub fn create_user_deactivated(reason: String) -> eventuali::UserDeactivated {
175        eventuali::UserDeactivated { reason }
176    }
177
178    /// Create an OrderPlaced event
179    pub fn create_order_placed(
180        customer_id: String, 
181        items: Vec<eventuali::OrderItem>, 
182        total_amount: f64
183    ) -> eventuali::OrderPlaced {
184        eventuali::OrderPlaced { customer_id, items, total_amount }
185    }
186}