Skip to main content

like_a_clockwork/
event.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::vector::VectorTimestamp;
6
7#[derive(Debug, thiserror::Error)]
8pub enum TracedEventError {
9    #[error("missing required header: {0}")]
10    MissingHeader(String),
11
12    #[error("invalid vector clock: {0}")]
13    InvalidVectorClock(String),
14
15    #[error("invalid event type: cannot be empty")]
16    InvalidEventType,
17
18    #[error("json parse error: {0}")]
19    JsonError(#[from] serde_json::Error),
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct TracedEvent {
24    event_type: String,
25    payload: Vec<u8>,
26    causality: VectorTimestamp,
27    event_id: String,
28    timestamp_utc: Option<String>,
29}
30
31impl TracedEvent {
32    pub fn new(
33        event_type: &str,
34        payload: &[u8],
35        causality: VectorTimestamp,
36    ) -> Result<Self, TracedEventError> {
37        if event_type.is_empty() {
38            return Err(TracedEventError::InvalidEventType);
39        }
40        Ok(Self {
41            event_type: event_type.to_string(),
42            payload: payload.to_vec(),
43            causality,
44            event_id: uuid::Uuid::now_v7().to_string(),
45            timestamp_utc: None,
46        })
47    }
48
49    pub fn with_id(
50        event_id: &str,
51        event_type: &str,
52        payload: &[u8],
53        causality: VectorTimestamp,
54    ) -> Result<Self, TracedEventError> {
55        if event_type.is_empty() {
56            return Err(TracedEventError::InvalidEventType);
57        }
58        Ok(Self {
59            event_type: event_type.to_string(),
60            payload: payload.to_vec(),
61            causality,
62            event_id: event_id.to_string(),
63            timestamp_utc: None,
64        })
65    }
66
67    pub fn event_type(&self) -> &str {
68        &self.event_type
69    }
70
71    pub fn payload(&self) -> &[u8] {
72        &self.payload
73    }
74
75    pub fn causality(&self) -> &VectorTimestamp {
76        &self.causality
77    }
78
79    pub fn event_id(&self) -> &str {
80        &self.event_id
81    }
82
83    pub fn timestamp_utc(&self) -> Option<&str> {
84        self.timestamp_utc.as_deref()
85    }
86
87    pub fn to_headers(&self) -> HashMap<String, String> {
88        let mut headers = HashMap::new();
89        headers.insert(
90            "X-Causality-Vector".to_string(),
91            self.causality.to_string(),
92        );
93        headers.insert(
94            "X-Causality-EventId".to_string(),
95            self.event_id.clone(),
96        );
97        headers.insert(
98            "X-Causality-EventType".to_string(),
99            self.event_type.clone(),
100        );
101        headers
102    }
103
104    pub fn from_headers(
105        headers: &HashMap<String, String>,
106        payload: &[u8],
107    ) -> Result<Self, TracedEventError> {
108        let vector_str = headers
109            .get("X-Causality-Vector")
110            .ok_or_else(|| TracedEventError::MissingHeader("X-Causality-Vector".to_string()))?;
111        let event_id = headers
112            .get("X-Causality-EventId")
113            .ok_or_else(|| TracedEventError::MissingHeader("X-Causality-EventId".to_string()))?;
114        let event_type = headers
115            .get("X-Causality-EventType")
116            .ok_or_else(|| TracedEventError::MissingHeader("X-Causality-EventType".to_string()))?;
117
118        let causality = vector_str
119            .parse::<VectorTimestamp>()
120            .map_err(|e| TracedEventError::InvalidVectorClock(e.to_string()))?;
121
122        Ok(Self {
123            event_type: event_type.clone(),
124            payload: payload.to_vec(),
125            causality,
126            event_id: event_id.clone(),
127            timestamp_utc: None,
128        })
129    }
130
131    pub fn to_json_value(&self) -> serde_json::Value {
132        serde_json::json!({
133            "_causality": {
134                "vector": serde_json::to_value(self.causality.clocks()).unwrap(),
135                "event_id": self.event_id,
136                "event_type": self.event_type,
137            },
138            "payload": serde_json::to_value(&self.payload).unwrap(),
139        })
140    }
141
142    pub fn from_json_value(value: &serde_json::Value) -> Result<Self, TracedEventError> {
143        let causality_obj = value
144            .get("_causality")
145            .ok_or_else(|| TracedEventError::MissingHeader("_causality".to_string()))?;
146
147        let vector_map: HashMap<String, u64> = serde_json::from_value(
148            causality_obj
149                .get("vector")
150                .ok_or_else(|| TracedEventError::MissingHeader("vector".to_string()))?
151                .clone(),
152        )?;
153
154        let event_id: String = serde_json::from_value(
155            causality_obj
156                .get("event_id")
157                .ok_or_else(|| TracedEventError::MissingHeader("event_id".to_string()))?
158                .clone(),
159        )?;
160
161        let event_type: String = serde_json::from_value(
162            causality_obj
163                .get("event_type")
164                .ok_or_else(|| TracedEventError::MissingHeader("event_type".to_string()))?
165                .clone(),
166        )?;
167
168        let payload: Vec<u8> = serde_json::from_value(
169            value
170                .get("payload")
171                .ok_or_else(|| TracedEventError::MissingHeader("payload".to_string()))?
172                .clone(),
173        )?;
174
175        Ok(Self {
176            event_type,
177            payload,
178            causality: VectorTimestamp::from(vector_map),
179            event_id,
180            timestamp_utc: None,
181        })
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    fn sample_causality() -> VectorTimestamp {
190        let mut clocks = HashMap::new();
191        clocks.insert("svc-a".to_string(), 3);
192        clocks.insert("svc-b".to_string(), 1);
193        VectorTimestamp::from(clocks)
194    }
195
196    #[test]
197    fn new_preserves_event_type() {
198        let event = TracedEvent::new("order.created", b"data", sample_causality()).unwrap();
199        assert_eq!(event.event_type(), "order.created");
200    }
201
202    #[test]
203    fn new_preserves_payload() {
204        let payload = b"hello world";
205        let event = TracedEvent::new("order.created", payload, sample_causality()).unwrap();
206        assert_eq!(event.payload(), payload);
207    }
208
209    #[test]
210    fn new_captures_vector_timestamp() {
211        let causality = sample_causality();
212        let event = TracedEvent::new("order.created", b"data", causality.clone()).unwrap();
213        assert_eq!(event.causality(), &causality);
214    }
215
216    #[test]
217    fn new_generates_unique_event_ids() {
218        let e1 = TracedEvent::new("order.created", b"a", sample_causality()).unwrap();
219        let e2 = TracedEvent::new("order.created", b"b", sample_causality()).unwrap();
220        assert_ne!(e1.event_id(), e2.event_id());
221    }
222
223    #[test]
224    fn new_rejects_empty_event_type() {
225        let result = TracedEvent::new("", b"data", sample_causality());
226        assert!(result.is_err());
227        assert!(matches!(result.unwrap_err(), TracedEventError::InvalidEventType));
228    }
229
230    #[test]
231    fn new_accepts_empty_payload() {
232        let event = TracedEvent::new("order.created", b"", sample_causality()).unwrap();
233        assert!(event.payload().is_empty());
234    }
235
236    #[test]
237    fn with_id_uses_provided_id() {
238        let event =
239            TracedEvent::with_id("my-id-123", "order.created", b"data", sample_causality())
240                .unwrap();
241        assert_eq!(event.event_id(), "my-id-123");
242    }
243
244    #[test]
245    fn accessors_return_correct_values() {
246        let causality = sample_causality();
247        let event =
248            TracedEvent::with_id("ev-1", "user.signup", b"payload", causality.clone()).unwrap();
249        assert_eq!(event.event_type(), "user.signup");
250        assert_eq!(event.payload(), b"payload");
251        assert_eq!(event.causality(), &causality);
252        assert_eq!(event.event_id(), "ev-1");
253        assert_eq!(event.timestamp_utc(), None);
254    }
255
256    #[test]
257    fn to_headers_contains_vector_clock() {
258        let event =
259            TracedEvent::with_id("ev-1", "order.created", b"data", sample_causality()).unwrap();
260        let headers = event.to_headers();
261        let vector = headers.get("X-Causality-Vector").unwrap();
262        assert_eq!(vector, "svc-a=3,svc-b=1");
263    }
264
265    #[test]
266    fn to_headers_contains_event_id() {
267        let event =
268            TracedEvent::with_id("ev-1", "order.created", b"data", sample_causality()).unwrap();
269        let headers = event.to_headers();
270        assert_eq!(headers.get("X-Causality-EventId").unwrap(), "ev-1");
271    }
272
273    #[test]
274    fn to_headers_contains_event_type() {
275        let event =
276            TracedEvent::with_id("ev-1", "order.created", b"data", sample_causality()).unwrap();
277        let headers = event.to_headers();
278        assert_eq!(headers.get("X-Causality-EventType").unwrap(), "order.created");
279    }
280
281    #[test]
282    fn to_headers_does_not_include_payload() {
283        let event =
284            TracedEvent::with_id("ev-1", "order.created", b"secret", sample_causality()).unwrap();
285        let headers = event.to_headers();
286        assert_eq!(headers.len(), 3);
287        for value in headers.values() {
288            assert!(!value.contains("secret"));
289        }
290    }
291
292    #[test]
293    fn from_headers_roundtrip() {
294        let payload = b"roundtrip-data";
295        let original =
296            TracedEvent::with_id("ev-rt", "order.created", payload, sample_causality()).unwrap();
297        let headers = original.to_headers();
298        let restored = TracedEvent::from_headers(&headers, payload).unwrap();
299
300        assert_eq!(restored.event_type(), original.event_type());
301        assert_eq!(restored.event_id(), original.event_id());
302        assert_eq!(restored.payload(), original.payload());
303        assert_eq!(restored.causality(), original.causality());
304    }
305
306    #[test]
307    fn from_headers_missing_vector_returns_error() {
308        let mut headers = HashMap::new();
309        headers.insert("X-Causality-EventId".to_string(), "ev-1".to_string());
310        headers.insert("X-Causality-EventType".to_string(), "order.created".to_string());
311
312        let result = TracedEvent::from_headers(&headers, b"data");
313        assert!(result.is_err());
314        assert!(matches!(
315            result.unwrap_err(),
316            TracedEventError::MissingHeader(h) if h == "X-Causality-Vector"
317        ));
318    }
319
320    #[test]
321    fn from_headers_missing_event_id_returns_error() {
322        let mut headers = HashMap::new();
323        headers.insert("X-Causality-Vector".to_string(), "svc-a=3".to_string());
324        headers.insert("X-Causality-EventType".to_string(), "order.created".to_string());
325
326        let result = TracedEvent::from_headers(&headers, b"data");
327        assert!(result.is_err());
328        assert!(matches!(
329            result.unwrap_err(),
330            TracedEventError::MissingHeader(h) if h == "X-Causality-EventId"
331        ));
332    }
333
334    #[test]
335    fn from_headers_invalid_vector_returns_error() {
336        let mut headers = HashMap::new();
337        headers.insert("X-Causality-Vector".to_string(), "not-valid".to_string());
338        headers.insert("X-Causality-EventId".to_string(), "ev-1".to_string());
339        headers.insert("X-Causality-EventType".to_string(), "order.created".to_string());
340
341        let result = TracedEvent::from_headers(&headers, b"data");
342        assert!(result.is_err());
343        assert!(matches!(
344            result.unwrap_err(),
345            TracedEventError::InvalidVectorClock(_)
346        ));
347    }
348
349    #[test]
350    fn to_json_contains_causality_field() {
351        let event =
352            TracedEvent::with_id("ev-1", "order.created", b"hi", sample_causality()).unwrap();
353        let json = event.to_json_value();
354
355        let causality = json.get("_causality").expect("missing _causality");
356        assert_eq!(causality.get("event_id").unwrap(), "ev-1");
357        assert_eq!(causality.get("event_type").unwrap(), "order.created");
358
359        let vector = causality.get("vector").unwrap().as_object().unwrap();
360        assert_eq!(vector.get("svc-a").unwrap(), 3);
361        assert_eq!(vector.get("svc-b").unwrap(), 1);
362    }
363
364    #[test]
365    fn from_json_roundtrip() {
366        let original =
367            TracedEvent::with_id("ev-json", "order.created", b"json-data", sample_causality())
368                .unwrap();
369        let json = original.to_json_value();
370        let restored = TracedEvent::from_json_value(&json).unwrap();
371
372        assert_eq!(restored.event_type(), original.event_type());
373        assert_eq!(restored.event_id(), original.event_id());
374        assert_eq!(restored.payload(), original.payload());
375        assert_eq!(restored.causality(), original.causality());
376    }
377
378    #[test]
379    fn serde_json_roundtrip() {
380        let original =
381            TracedEvent::with_id("ev-serde", "user.signup", b"serde-payload", sample_causality())
382                .unwrap();
383        let json_str = serde_json::to_string(&original).unwrap();
384        let restored: TracedEvent = serde_json::from_str(&json_str).unwrap();
385
386        assert_eq!(restored.event_type(), original.event_type());
387        assert_eq!(restored.event_id(), original.event_id());
388        assert_eq!(restored.payload(), original.payload());
389        assert_eq!(restored.causality(), original.causality());
390        assert_eq!(restored.timestamp_utc(), original.timestamp_utc());
391    }
392}