Skip to main content

like_a_clockwork/transport/
binary.rs

1use std::collections::HashMap;
2
3use super::{BinaryHeaderMap, TransportError};
4use crate::event::TracedEvent;
5use crate::lamport::LamportTimestamp;
6use crate::vector::VectorTimestamp;
7
8const LAMPORT_KEY: &str = "causality-lc";
9const VECTOR_KEY: &str = "causality-vc";
10const EVENT_ID_KEY: &str = "causality-eid";
11const EVENT_TYPE_KEY: &str = "causality-etype";
12
13pub fn inject_lamport(
14    headers: &mut impl BinaryHeaderMap,
15    timestamp: &LamportTimestamp,
16) -> Result<(), TransportError> {
17    headers.set(
18        LAMPORT_KEY.to_string(),
19        timestamp.to_string().into_bytes(),
20    );
21    Ok(())
22}
23
24pub fn extract_lamport(
25    headers: &impl BinaryHeaderMap,
26) -> Result<Option<LamportTimestamp>, TransportError> {
27    match headers.get(LAMPORT_KEY) {
28        None => Ok(None),
29        Some(bytes) => {
30            let s = std::str::from_utf8(bytes).map_err(|e| TransportError::MalformedValue {
31                key: LAMPORT_KEY.to_string(),
32                reason: e.to_string(),
33            })?;
34            s.parse::<LamportTimestamp>()
35                .map(Some)
36                .map_err(|e| TransportError::MalformedValue {
37                    key: LAMPORT_KEY.to_string(),
38                    reason: e.to_string(),
39                })
40        }
41    }
42}
43
44pub fn inject_vector(
45    headers: &mut impl BinaryHeaderMap,
46    timestamp: &VectorTimestamp,
47) -> Result<(), TransportError> {
48    let bytes = rmp_serde::to_vec(timestamp.clocks())
49        .map_err(|e| TransportError::SerializationError(e.to_string()))?;
50    headers.set(VECTOR_KEY.to_string(), bytes);
51    Ok(())
52}
53
54pub fn extract_vector(
55    headers: &impl BinaryHeaderMap,
56) -> Result<Option<VectorTimestamp>, TransportError> {
57    match headers.get(VECTOR_KEY) {
58        None => Ok(None),
59        Some(bytes) => {
60            let clocks: HashMap<String, u64> = rmp_serde::from_slice(bytes)
61                .map_err(|e| TransportError::DeserializationError(e.to_string()))?;
62            Ok(Some(VectorTimestamp::from(clocks)))
63        }
64    }
65}
66
67pub fn inject_event(
68    headers: &mut impl BinaryHeaderMap,
69    event: &TracedEvent,
70) -> Result<(), TransportError> {
71    inject_vector(headers, event.causality())?;
72    headers.set(EVENT_ID_KEY.to_string(), event.event_id().as_bytes().to_vec());
73    headers.set(
74        EVENT_TYPE_KEY.to_string(),
75        event.event_type().as_bytes().to_vec(),
76    );
77    Ok(())
78}
79
80pub fn extract_event(
81    headers: &impl BinaryHeaderMap,
82    payload: &[u8],
83) -> Result<TracedEvent, TransportError> {
84    let causality = extract_vector(headers)?
85        .ok_or_else(|| TransportError::MissingKey(VECTOR_KEY.to_string()))?;
86
87    let event_id_bytes = headers
88        .get(EVENT_ID_KEY)
89        .ok_or_else(|| TransportError::MissingKey(EVENT_ID_KEY.to_string()))?;
90    let event_id = std::str::from_utf8(event_id_bytes).map_err(|e| {
91        TransportError::MalformedValue {
92            key: EVENT_ID_KEY.to_string(),
93            reason: e.to_string(),
94        }
95    })?;
96
97    let event_type_bytes = headers
98        .get(EVENT_TYPE_KEY)
99        .ok_or_else(|| TransportError::MissingKey(EVENT_TYPE_KEY.to_string()))?;
100    let event_type = std::str::from_utf8(event_type_bytes).map_err(|e| {
101        TransportError::MalformedValue {
102            key: EVENT_TYPE_KEY.to_string(),
103            reason: e.to_string(),
104        }
105    })?;
106
107    TracedEvent::with_id(event_id, event_type, payload, causality).map_err(|e| {
108        TransportError::DeserializationError(e.to_string())
109    })
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    fn sample_lamport() -> LamportTimestamp {
117        "order-svc:42".parse().unwrap()
118    }
119
120    fn sample_vector() -> VectorTimestamp {
121        let mut clocks = HashMap::new();
122        clocks.insert("svc-a".to_string(), 3);
123        clocks.insert("svc-b".to_string(), 1);
124        VectorTimestamp::from(clocks)
125    }
126
127    fn sample_event() -> TracedEvent {
128        TracedEvent::with_id("ev-1", "order.created", b"data", sample_vector()).unwrap()
129    }
130
131    #[test]
132    fn binary_inject_lamport_sets_utf8_bytes() {
133        let mut headers = HashMap::<String, Vec<u8>>::new();
134        inject_lamport(&mut headers, &sample_lamport()).unwrap();
135        let bytes = BinaryHeaderMap::get(&headers, "causality-lc").unwrap();
136        assert_eq!(bytes, b"order-svc:42");
137    }
138
139    #[test]
140    fn binary_extract_lamport_roundtrip() {
141        let mut headers = HashMap::<String, Vec<u8>>::new();
142        let ts = sample_lamport();
143        inject_lamport(&mut headers, &ts).unwrap();
144        let extracted = extract_lamport(&headers).unwrap().unwrap();
145        assert_eq!(extracted, ts);
146    }
147
148    #[test]
149    fn binary_extract_lamport_missing_returns_none() {
150        let headers = HashMap::<String, Vec<u8>>::new();
151        assert!(extract_lamport(&headers).unwrap().is_none());
152    }
153
154    #[test]
155    fn binary_inject_vector_produces_msgpack() {
156        let mut headers = HashMap::<String, Vec<u8>>::new();
157        inject_vector(&mut headers, &sample_vector()).unwrap();
158        let bytes = BinaryHeaderMap::get(&headers, "causality-vc").unwrap();
159        // Verify it's valid msgpack by deserializing
160        let clocks: HashMap<String, u64> = rmp_serde::from_slice(bytes).unwrap();
161        assert_eq!(clocks.get("svc-a"), Some(&3));
162        assert_eq!(clocks.get("svc-b"), Some(&1));
163    }
164
165    #[test]
166    fn binary_extract_vector_roundtrip() {
167        let mut headers = HashMap::<String, Vec<u8>>::new();
168        let ts = sample_vector();
169        inject_vector(&mut headers, &ts).unwrap();
170        let extracted = extract_vector(&headers).unwrap().unwrap();
171        assert_eq!(extracted, ts);
172    }
173
174    #[test]
175    fn binary_extract_vector_missing_returns_none() {
176        let headers = HashMap::<String, Vec<u8>>::new();
177        assert!(extract_vector(&headers).unwrap().is_none());
178    }
179
180    #[test]
181    fn binary_extract_vector_corrupt_bytes_returns_err() {
182        let mut headers = HashMap::<String, Vec<u8>>::new();
183        headers.insert("causality-vc".to_string(), vec![0xFF, 0xFE, 0xFD]);
184        let result = extract_vector(&headers);
185        assert!(result.is_err());
186        assert!(matches!(
187            result.unwrap_err(),
188            TransportError::DeserializationError(_)
189        ));
190    }
191
192    #[test]
193    fn binary_msgpack_is_compact() {
194        let ts = sample_vector();
195        let msgpack_bytes =
196            rmp_serde::to_vec(ts.clocks()).unwrap();
197        let json_bytes = serde_json::to_vec(ts.clocks()).unwrap();
198        assert!(
199            msgpack_bytes.len() < json_bytes.len(),
200            "msgpack ({} bytes) should be smaller than JSON ({} bytes)",
201            msgpack_bytes.len(),
202            json_bytes.len()
203        );
204    }
205
206    #[test]
207    fn binary_inject_event_sets_all_headers() {
208        let mut headers = HashMap::<String, Vec<u8>>::new();
209        let event = sample_event();
210        inject_event(&mut headers, &event).unwrap();
211        assert!(BinaryHeaderMap::get(&headers, "causality-vc").is_some());
212        assert_eq!(
213            BinaryHeaderMap::get(&headers, "causality-eid"),
214            Some(b"ev-1".as_slice())
215        );
216        assert_eq!(
217            BinaryHeaderMap::get(&headers, "causality-etype"),
218            Some(b"order.created".as_slice())
219        );
220    }
221
222    #[test]
223    fn binary_extract_event_roundtrip() {
224        let mut headers = HashMap::<String, Vec<u8>>::new();
225        let event = sample_event();
226        let payload = event.payload().to_vec();
227        inject_event(&mut headers, &event).unwrap();
228        let extracted = extract_event(&headers, &payload).unwrap();
229        assert_eq!(extracted.event_id(), event.event_id());
230        assert_eq!(extracted.event_type(), event.event_type());
231        assert_eq!(extracted.payload(), event.payload());
232        assert_eq!(extracted.causality(), event.causality());
233    }
234}