1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::vector::VectorTimestamp;
6
7#[derive(Debug, thiserror::Error)]
8pub enum TracedEventError {
9 #[error("missing required header: {0}")]
10 MissingHeader(String),
11
12 #[error("invalid vector clock: {0}")]
13 InvalidVectorClock(String),
14
15 #[error("invalid event type: cannot be empty")]
16 InvalidEventType,
17
18 #[error("json parse error: {0}")]
19 JsonError(#[from] serde_json::Error),
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct TracedEvent {
24 event_type: String,
25 payload: Vec<u8>,
26 causality: VectorTimestamp,
27 event_id: String,
28 timestamp_utc: Option<String>,
29}
30
31impl TracedEvent {
32 pub fn new(
33 event_type: &str,
34 payload: &[u8],
35 causality: VectorTimestamp,
36 ) -> Result<Self, TracedEventError> {
37 if event_type.is_empty() {
38 return Err(TracedEventError::InvalidEventType);
39 }
40 Ok(Self {
41 event_type: event_type.to_string(),
42 payload: payload.to_vec(),
43 causality,
44 event_id: uuid::Uuid::now_v7().to_string(),
45 timestamp_utc: None,
46 })
47 }
48
49 pub fn with_id(
50 event_id: &str,
51 event_type: &str,
52 payload: &[u8],
53 causality: VectorTimestamp,
54 ) -> Result<Self, TracedEventError> {
55 if event_type.is_empty() {
56 return Err(TracedEventError::InvalidEventType);
57 }
58 Ok(Self {
59 event_type: event_type.to_string(),
60 payload: payload.to_vec(),
61 causality,
62 event_id: event_id.to_string(),
63 timestamp_utc: None,
64 })
65 }
66
67 pub fn event_type(&self) -> &str {
68 &self.event_type
69 }
70
71 pub fn payload(&self) -> &[u8] {
72 &self.payload
73 }
74
75 pub fn causality(&self) -> &VectorTimestamp {
76 &self.causality
77 }
78
79 pub fn event_id(&self) -> &str {
80 &self.event_id
81 }
82
83 pub fn timestamp_utc(&self) -> Option<&str> {
84 self.timestamp_utc.as_deref()
85 }
86
87 pub fn to_headers(&self) -> HashMap<String, String> {
88 let mut headers = HashMap::new();
89 headers.insert(
90 "X-Causality-Vector".to_string(),
91 self.causality.to_string(),
92 );
93 headers.insert(
94 "X-Causality-EventId".to_string(),
95 self.event_id.clone(),
96 );
97 headers.insert(
98 "X-Causality-EventType".to_string(),
99 self.event_type.clone(),
100 );
101 headers
102 }
103
104 pub fn from_headers(
105 headers: &HashMap<String, String>,
106 payload: &[u8],
107 ) -> Result<Self, TracedEventError> {
108 let vector_str = headers
109 .get("X-Causality-Vector")
110 .ok_or_else(|| TracedEventError::MissingHeader("X-Causality-Vector".to_string()))?;
111 let event_id = headers
112 .get("X-Causality-EventId")
113 .ok_or_else(|| TracedEventError::MissingHeader("X-Causality-EventId".to_string()))?;
114 let event_type = headers
115 .get("X-Causality-EventType")
116 .ok_or_else(|| TracedEventError::MissingHeader("X-Causality-EventType".to_string()))?;
117
118 let causality = vector_str
119 .parse::<VectorTimestamp>()
120 .map_err(|e| TracedEventError::InvalidVectorClock(e.to_string()))?;
121
122 Ok(Self {
123 event_type: event_type.clone(),
124 payload: payload.to_vec(),
125 causality,
126 event_id: event_id.clone(),
127 timestamp_utc: None,
128 })
129 }
130
131 pub fn to_json_value(&self) -> serde_json::Value {
132 serde_json::json!({
133 "_causality": {
134 "vector": serde_json::to_value(self.causality.clocks()).unwrap(),
135 "event_id": self.event_id,
136 "event_type": self.event_type,
137 },
138 "payload": serde_json::to_value(&self.payload).unwrap(),
139 })
140 }
141
142 pub fn from_json_value(value: &serde_json::Value) -> Result<Self, TracedEventError> {
143 let causality_obj = value
144 .get("_causality")
145 .ok_or_else(|| TracedEventError::MissingHeader("_causality".to_string()))?;
146
147 let vector_map: HashMap<String, u64> = serde_json::from_value(
148 causality_obj
149 .get("vector")
150 .ok_or_else(|| TracedEventError::MissingHeader("vector".to_string()))?
151 .clone(),
152 )?;
153
154 let event_id: String = serde_json::from_value(
155 causality_obj
156 .get("event_id")
157 .ok_or_else(|| TracedEventError::MissingHeader("event_id".to_string()))?
158 .clone(),
159 )?;
160
161 let event_type: String = serde_json::from_value(
162 causality_obj
163 .get("event_type")
164 .ok_or_else(|| TracedEventError::MissingHeader("event_type".to_string()))?
165 .clone(),
166 )?;
167
168 let payload: Vec<u8> = serde_json::from_value(
169 value
170 .get("payload")
171 .ok_or_else(|| TracedEventError::MissingHeader("payload".to_string()))?
172 .clone(),
173 )?;
174
175 Ok(Self {
176 event_type,
177 payload,
178 causality: VectorTimestamp::from(vector_map),
179 event_id,
180 timestamp_utc: None,
181 })
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 fn sample_causality() -> VectorTimestamp {
190 let mut clocks = HashMap::new();
191 clocks.insert("svc-a".to_string(), 3);
192 clocks.insert("svc-b".to_string(), 1);
193 VectorTimestamp::from(clocks)
194 }
195
196 #[test]
197 fn new_preserves_event_type() {
198 let event = TracedEvent::new("order.created", b"data", sample_causality()).unwrap();
199 assert_eq!(event.event_type(), "order.created");
200 }
201
202 #[test]
203 fn new_preserves_payload() {
204 let payload = b"hello world";
205 let event = TracedEvent::new("order.created", payload, sample_causality()).unwrap();
206 assert_eq!(event.payload(), payload);
207 }
208
209 #[test]
210 fn new_captures_vector_timestamp() {
211 let causality = sample_causality();
212 let event = TracedEvent::new("order.created", b"data", causality.clone()).unwrap();
213 assert_eq!(event.causality(), &causality);
214 }
215
216 #[test]
217 fn new_generates_unique_event_ids() {
218 let e1 = TracedEvent::new("order.created", b"a", sample_causality()).unwrap();
219 let e2 = TracedEvent::new("order.created", b"b", sample_causality()).unwrap();
220 assert_ne!(e1.event_id(), e2.event_id());
221 }
222
223 #[test]
224 fn new_rejects_empty_event_type() {
225 let result = TracedEvent::new("", b"data", sample_causality());
226 assert!(result.is_err());
227 assert!(matches!(result.unwrap_err(), TracedEventError::InvalidEventType));
228 }
229
230 #[test]
231 fn new_accepts_empty_payload() {
232 let event = TracedEvent::new("order.created", b"", sample_causality()).unwrap();
233 assert!(event.payload().is_empty());
234 }
235
236 #[test]
237 fn with_id_uses_provided_id() {
238 let event =
239 TracedEvent::with_id("my-id-123", "order.created", b"data", sample_causality())
240 .unwrap();
241 assert_eq!(event.event_id(), "my-id-123");
242 }
243
244 #[test]
245 fn accessors_return_correct_values() {
246 let causality = sample_causality();
247 let event =
248 TracedEvent::with_id("ev-1", "user.signup", b"payload", causality.clone()).unwrap();
249 assert_eq!(event.event_type(), "user.signup");
250 assert_eq!(event.payload(), b"payload");
251 assert_eq!(event.causality(), &causality);
252 assert_eq!(event.event_id(), "ev-1");
253 assert_eq!(event.timestamp_utc(), None);
254 }
255
256 #[test]
257 fn to_headers_contains_vector_clock() {
258 let event =
259 TracedEvent::with_id("ev-1", "order.created", b"data", sample_causality()).unwrap();
260 let headers = event.to_headers();
261 let vector = headers.get("X-Causality-Vector").unwrap();
262 assert_eq!(vector, "svc-a=3,svc-b=1");
263 }
264
265 #[test]
266 fn to_headers_contains_event_id() {
267 let event =
268 TracedEvent::with_id("ev-1", "order.created", b"data", sample_causality()).unwrap();
269 let headers = event.to_headers();
270 assert_eq!(headers.get("X-Causality-EventId").unwrap(), "ev-1");
271 }
272
273 #[test]
274 fn to_headers_contains_event_type() {
275 let event =
276 TracedEvent::with_id("ev-1", "order.created", b"data", sample_causality()).unwrap();
277 let headers = event.to_headers();
278 assert_eq!(headers.get("X-Causality-EventType").unwrap(), "order.created");
279 }
280
281 #[test]
282 fn to_headers_does_not_include_payload() {
283 let event =
284 TracedEvent::with_id("ev-1", "order.created", b"secret", sample_causality()).unwrap();
285 let headers = event.to_headers();
286 assert_eq!(headers.len(), 3);
287 for value in headers.values() {
288 assert!(!value.contains("secret"));
289 }
290 }
291
292 #[test]
293 fn from_headers_roundtrip() {
294 let payload = b"roundtrip-data";
295 let original =
296 TracedEvent::with_id("ev-rt", "order.created", payload, sample_causality()).unwrap();
297 let headers = original.to_headers();
298 let restored = TracedEvent::from_headers(&headers, payload).unwrap();
299
300 assert_eq!(restored.event_type(), original.event_type());
301 assert_eq!(restored.event_id(), original.event_id());
302 assert_eq!(restored.payload(), original.payload());
303 assert_eq!(restored.causality(), original.causality());
304 }
305
306 #[test]
307 fn from_headers_missing_vector_returns_error() {
308 let mut headers = HashMap::new();
309 headers.insert("X-Causality-EventId".to_string(), "ev-1".to_string());
310 headers.insert("X-Causality-EventType".to_string(), "order.created".to_string());
311
312 let result = TracedEvent::from_headers(&headers, b"data");
313 assert!(result.is_err());
314 assert!(matches!(
315 result.unwrap_err(),
316 TracedEventError::MissingHeader(h) if h == "X-Causality-Vector"
317 ));
318 }
319
320 #[test]
321 fn from_headers_missing_event_id_returns_error() {
322 let mut headers = HashMap::new();
323 headers.insert("X-Causality-Vector".to_string(), "svc-a=3".to_string());
324 headers.insert("X-Causality-EventType".to_string(), "order.created".to_string());
325
326 let result = TracedEvent::from_headers(&headers, b"data");
327 assert!(result.is_err());
328 assert!(matches!(
329 result.unwrap_err(),
330 TracedEventError::MissingHeader(h) if h == "X-Causality-EventId"
331 ));
332 }
333
334 #[test]
335 fn from_headers_invalid_vector_returns_error() {
336 let mut headers = HashMap::new();
337 headers.insert("X-Causality-Vector".to_string(), "not-valid".to_string());
338 headers.insert("X-Causality-EventId".to_string(), "ev-1".to_string());
339 headers.insert("X-Causality-EventType".to_string(), "order.created".to_string());
340
341 let result = TracedEvent::from_headers(&headers, b"data");
342 assert!(result.is_err());
343 assert!(matches!(
344 result.unwrap_err(),
345 TracedEventError::InvalidVectorClock(_)
346 ));
347 }
348
349 #[test]
350 fn to_json_contains_causality_field() {
351 let event =
352 TracedEvent::with_id("ev-1", "order.created", b"hi", sample_causality()).unwrap();
353 let json = event.to_json_value();
354
355 let causality = json.get("_causality").expect("missing _causality");
356 assert_eq!(causality.get("event_id").unwrap(), "ev-1");
357 assert_eq!(causality.get("event_type").unwrap(), "order.created");
358
359 let vector = causality.get("vector").unwrap().as_object().unwrap();
360 assert_eq!(vector.get("svc-a").unwrap(), 3);
361 assert_eq!(vector.get("svc-b").unwrap(), 1);
362 }
363
364 #[test]
365 fn from_json_roundtrip() {
366 let original =
367 TracedEvent::with_id("ev-json", "order.created", b"json-data", sample_causality())
368 .unwrap();
369 let json = original.to_json_value();
370 let restored = TracedEvent::from_json_value(&json).unwrap();
371
372 assert_eq!(restored.event_type(), original.event_type());
373 assert_eq!(restored.event_id(), original.event_id());
374 assert_eq!(restored.payload(), original.payload());
375 assert_eq!(restored.causality(), original.causality());
376 }
377
378 #[test]
379 fn serde_json_roundtrip() {
380 let original =
381 TracedEvent::with_id("ev-serde", "user.signup", b"serde-payload", sample_causality())
382 .unwrap();
383 let json_str = serde_json::to_string(&original).unwrap();
384 let restored: TracedEvent = serde_json::from_str(&json_str).unwrap();
385
386 assert_eq!(restored.event_type(), original.event_type());
387 assert_eq!(restored.event_id(), original.event_id());
388 assert_eq!(restored.payload(), original.payload());
389 assert_eq!(restored.causality(), original.causality());
390 assert_eq!(restored.timestamp_utc(), original.timestamp_utc());
391 }
392}