like_a_clockwork/transport/
binary.rs1use std::collections::HashMap;
2
3use super::{BinaryHeaderMap, TransportError};
4use crate::event::TracedEvent;
5use crate::lamport::LamportTimestamp;
6use crate::vector::VectorTimestamp;
7
8const LAMPORT_KEY: &str = "causality-lc";
9const VECTOR_KEY: &str = "causality-vc";
10const EVENT_ID_KEY: &str = "causality-eid";
11const EVENT_TYPE_KEY: &str = "causality-etype";
12
13pub fn inject_lamport(
14 headers: &mut impl BinaryHeaderMap,
15 timestamp: &LamportTimestamp,
16) -> Result<(), TransportError> {
17 headers.set(
18 LAMPORT_KEY.to_string(),
19 timestamp.to_string().into_bytes(),
20 );
21 Ok(())
22}
23
24pub fn extract_lamport(
25 headers: &impl BinaryHeaderMap,
26) -> Result<Option<LamportTimestamp>, TransportError> {
27 match headers.get(LAMPORT_KEY) {
28 None => Ok(None),
29 Some(bytes) => {
30 let s = std::str::from_utf8(bytes).map_err(|e| TransportError::MalformedValue {
31 key: LAMPORT_KEY.to_string(),
32 reason: e.to_string(),
33 })?;
34 s.parse::<LamportTimestamp>()
35 .map(Some)
36 .map_err(|e| TransportError::MalformedValue {
37 key: LAMPORT_KEY.to_string(),
38 reason: e.to_string(),
39 })
40 }
41 }
42}
43
44pub fn inject_vector(
45 headers: &mut impl BinaryHeaderMap,
46 timestamp: &VectorTimestamp,
47) -> Result<(), TransportError> {
48 let bytes = rmp_serde::to_vec(timestamp.clocks())
49 .map_err(|e| TransportError::SerializationError(e.to_string()))?;
50 headers.set(VECTOR_KEY.to_string(), bytes);
51 Ok(())
52}
53
54pub fn extract_vector(
55 headers: &impl BinaryHeaderMap,
56) -> Result<Option<VectorTimestamp>, TransportError> {
57 match headers.get(VECTOR_KEY) {
58 None => Ok(None),
59 Some(bytes) => {
60 let clocks: HashMap<String, u64> = rmp_serde::from_slice(bytes)
61 .map_err(|e| TransportError::DeserializationError(e.to_string()))?;
62 Ok(Some(VectorTimestamp::from(clocks)))
63 }
64 }
65}
66
67pub fn inject_event(
68 headers: &mut impl BinaryHeaderMap,
69 event: &TracedEvent,
70) -> Result<(), TransportError> {
71 inject_vector(headers, event.causality())?;
72 headers.set(EVENT_ID_KEY.to_string(), event.event_id().as_bytes().to_vec());
73 headers.set(
74 EVENT_TYPE_KEY.to_string(),
75 event.event_type().as_bytes().to_vec(),
76 );
77 Ok(())
78}
79
80pub fn extract_event(
81 headers: &impl BinaryHeaderMap,
82 payload: &[u8],
83) -> Result<TracedEvent, TransportError> {
84 let causality = extract_vector(headers)?
85 .ok_or_else(|| TransportError::MissingKey(VECTOR_KEY.to_string()))?;
86
87 let event_id_bytes = headers
88 .get(EVENT_ID_KEY)
89 .ok_or_else(|| TransportError::MissingKey(EVENT_ID_KEY.to_string()))?;
90 let event_id = std::str::from_utf8(event_id_bytes).map_err(|e| {
91 TransportError::MalformedValue {
92 key: EVENT_ID_KEY.to_string(),
93 reason: e.to_string(),
94 }
95 })?;
96
97 let event_type_bytes = headers
98 .get(EVENT_TYPE_KEY)
99 .ok_or_else(|| TransportError::MissingKey(EVENT_TYPE_KEY.to_string()))?;
100 let event_type = std::str::from_utf8(event_type_bytes).map_err(|e| {
101 TransportError::MalformedValue {
102 key: EVENT_TYPE_KEY.to_string(),
103 reason: e.to_string(),
104 }
105 })?;
106
107 TracedEvent::with_id(event_id, event_type, payload, causality).map_err(|e| {
108 TransportError::DeserializationError(e.to_string())
109 })
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115
116 fn sample_lamport() -> LamportTimestamp {
117 "order-svc:42".parse().unwrap()
118 }
119
120 fn sample_vector() -> VectorTimestamp {
121 let mut clocks = HashMap::new();
122 clocks.insert("svc-a".to_string(), 3);
123 clocks.insert("svc-b".to_string(), 1);
124 VectorTimestamp::from(clocks)
125 }
126
127 fn sample_event() -> TracedEvent {
128 TracedEvent::with_id("ev-1", "order.created", b"data", sample_vector()).unwrap()
129 }
130
131 #[test]
132 fn binary_inject_lamport_sets_utf8_bytes() {
133 let mut headers = HashMap::<String, Vec<u8>>::new();
134 inject_lamport(&mut headers, &sample_lamport()).unwrap();
135 let bytes = BinaryHeaderMap::get(&headers, "causality-lc").unwrap();
136 assert_eq!(bytes, b"order-svc:42");
137 }
138
139 #[test]
140 fn binary_extract_lamport_roundtrip() {
141 let mut headers = HashMap::<String, Vec<u8>>::new();
142 let ts = sample_lamport();
143 inject_lamport(&mut headers, &ts).unwrap();
144 let extracted = extract_lamport(&headers).unwrap().unwrap();
145 assert_eq!(extracted, ts);
146 }
147
148 #[test]
149 fn binary_extract_lamport_missing_returns_none() {
150 let headers = HashMap::<String, Vec<u8>>::new();
151 assert!(extract_lamport(&headers).unwrap().is_none());
152 }
153
154 #[test]
155 fn binary_inject_vector_produces_msgpack() {
156 let mut headers = HashMap::<String, Vec<u8>>::new();
157 inject_vector(&mut headers, &sample_vector()).unwrap();
158 let bytes = BinaryHeaderMap::get(&headers, "causality-vc").unwrap();
159 let clocks: HashMap<String, u64> = rmp_serde::from_slice(bytes).unwrap();
161 assert_eq!(clocks.get("svc-a"), Some(&3));
162 assert_eq!(clocks.get("svc-b"), Some(&1));
163 }
164
165 #[test]
166 fn binary_extract_vector_roundtrip() {
167 let mut headers = HashMap::<String, Vec<u8>>::new();
168 let ts = sample_vector();
169 inject_vector(&mut headers, &ts).unwrap();
170 let extracted = extract_vector(&headers).unwrap().unwrap();
171 assert_eq!(extracted, ts);
172 }
173
174 #[test]
175 fn binary_extract_vector_missing_returns_none() {
176 let headers = HashMap::<String, Vec<u8>>::new();
177 assert!(extract_vector(&headers).unwrap().is_none());
178 }
179
180 #[test]
181 fn binary_extract_vector_corrupt_bytes_returns_err() {
182 let mut headers = HashMap::<String, Vec<u8>>::new();
183 headers.insert("causality-vc".to_string(), vec![0xFF, 0xFE, 0xFD]);
184 let result = extract_vector(&headers);
185 assert!(result.is_err());
186 assert!(matches!(
187 result.unwrap_err(),
188 TransportError::DeserializationError(_)
189 ));
190 }
191
192 #[test]
193 fn binary_msgpack_is_compact() {
194 let ts = sample_vector();
195 let msgpack_bytes =
196 rmp_serde::to_vec(ts.clocks()).unwrap();
197 let json_bytes = serde_json::to_vec(ts.clocks()).unwrap();
198 assert!(
199 msgpack_bytes.len() < json_bytes.len(),
200 "msgpack ({} bytes) should be smaller than JSON ({} bytes)",
201 msgpack_bytes.len(),
202 json_bytes.len()
203 );
204 }
205
206 #[test]
207 fn binary_inject_event_sets_all_headers() {
208 let mut headers = HashMap::<String, Vec<u8>>::new();
209 let event = sample_event();
210 inject_event(&mut headers, &event).unwrap();
211 assert!(BinaryHeaderMap::get(&headers, "causality-vc").is_some());
212 assert_eq!(
213 BinaryHeaderMap::get(&headers, "causality-eid"),
214 Some(b"ev-1".as_slice())
215 );
216 assert_eq!(
217 BinaryHeaderMap::get(&headers, "causality-etype"),
218 Some(b"order.created".as_slice())
219 );
220 }
221
222 #[test]
223 fn binary_extract_event_roundtrip() {
224 let mut headers = HashMap::<String, Vec<u8>>::new();
225 let event = sample_event();
226 let payload = event.payload().to_vec();
227 inject_event(&mut headers, &event).unwrap();
228 let extracted = extract_event(&headers, &payload).unwrap();
229 assert_eq!(extracted.event_id(), event.event_id());
230 assert_eq!(extracted.event_type(), event.event_type());
231 assert_eq!(extracted.payload(), event.payload());
232 assert_eq!(extracted.causality(), event.causality());
233 }
234}