composable_runtime/messaging/
message.rs1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use uuid::Uuid;
5
6pub mod header {
8 pub const ID: &str = "id";
9 pub const TIMESTAMP: &str = "timestamp";
10 pub const TTL: &str = "ttl";
11 pub const CONTENT_TYPE: &str = "content-type";
12 pub const REPLY_TO: &str = "reply-to";
13 pub const CORRELATION_ID: &str = "correlation-id";
14}
15
16#[derive(Debug, Clone, PartialEq)]
18pub enum HeaderValue {
19 String(String),
20 Integer(i64),
21 Bool(bool),
22}
23
24impl std::fmt::Display for HeaderValue {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 match self {
27 HeaderValue::String(s) => write!(f, "{s}"),
28 HeaderValue::Integer(n) => write!(f, "{n}"),
29 HeaderValue::Bool(b) => write!(f, "{b}"),
30 }
31 }
32}
33
34impl From<String> for HeaderValue {
35 fn from(s: String) -> Self {
36 HeaderValue::String(s)
37 }
38}
39
40impl From<&str> for HeaderValue {
41 fn from(s: &str) -> Self {
42 HeaderValue::String(s.to_string())
43 }
44}
45
46impl From<i64> for HeaderValue {
47 fn from(n: i64) -> Self {
48 HeaderValue::Integer(n)
49 }
50}
51
52impl From<bool> for HeaderValue {
53 fn from(b: bool) -> Self {
54 HeaderValue::Bool(b)
55 }
56}
57
58pub trait FromHeaderValue<'a>: Sized {
62 fn from_header_value(value: &'a HeaderValue) -> Option<Self>;
63}
64
65impl<'a> FromHeaderValue<'a> for &'a str {
66 fn from_header_value(value: &'a HeaderValue) -> Option<Self> {
67 match value {
68 HeaderValue::String(s) => Some(s.as_str()),
69 _ => None,
70 }
71 }
72}
73
74impl<'a> FromHeaderValue<'a> for &'a HeaderValue {
75 fn from_header_value(value: &'a HeaderValue) -> Option<Self> {
76 Some(value)
77 }
78}
79
80impl FromHeaderValue<'_> for i64 {
81 fn from_header_value(value: &HeaderValue) -> Option<Self> {
82 match value {
83 HeaderValue::Integer(n) => Some(*n),
84 _ => None,
85 }
86 }
87}
88
89impl FromHeaderValue<'_> for u64 {
90 fn from_header_value(value: &HeaderValue) -> Option<Self> {
91 match value {
92 HeaderValue::Integer(n) => (*n).try_into().ok(),
93 _ => None,
94 }
95 }
96}
97
98impl FromHeaderValue<'_> for bool {
99 fn from_header_value(value: &HeaderValue) -> Option<Self> {
100 match value {
101 HeaderValue::Bool(b) => Some(*b),
102 _ => None,
103 }
104 }
105}
106
107#[derive(Debug, Clone, PartialEq)]
113pub struct MessageHeaders {
114 map: HashMap<String, HeaderValue>,
115}
116
117impl MessageHeaders {
118 pub fn id(&self) -> &str {
120 match self.map.get(header::ID) {
121 Some(HeaderValue::String(s)) => s.as_str(),
122 _ => unreachable!("MessageBuilder guarantees id is present"),
123 }
124 }
125
126 pub fn timestamp(&self) -> u64 {
129 match self.map.get(header::TIMESTAMP) {
130 Some(HeaderValue::Integer(n)) => *n as u64,
131 _ => unreachable!("MessageBuilder guarantees timestamp is present"),
132 }
133 }
134
135 pub fn ttl(&self) -> Option<u64> {
137 self.get(header::TTL)
138 }
139
140 pub fn content_type(&self) -> Option<&str> {
142 self.get(header::CONTENT_TYPE)
143 }
144
145 pub fn reply_to(&self) -> Option<&str> {
147 self.get(header::REPLY_TO)
148 }
149
150 pub fn correlation_id(&self) -> Option<&str> {
152 self.get(header::CORRELATION_ID)
153 }
154
155 pub fn get<'a, T: FromHeaderValue<'a>>(&'a self, key: &str) -> Option<T> {
166 self.map.get(key).and_then(T::from_header_value)
167 }
168
169 pub fn iter(&self) -> impl Iterator<Item = (&str, &HeaderValue)> {
171 self.map.iter().map(|(k, v)| (k.as_str(), v))
172 }
173
174 pub fn len(&self) -> usize {
176 self.map.len()
177 }
178
179 pub fn is_empty(&self) -> bool {
181 self.map.is_empty()
182 }
183}
184
185#[derive(Debug, Clone, PartialEq)]
190pub struct Message {
191 headers: MessageHeaders,
192 body: Vec<u8>,
193}
194
195impl Message {
196 pub fn headers(&self) -> &MessageHeaders {
198 &self.headers
199 }
200
201 pub fn body(&self) -> &[u8] {
203 &self.body
204 }
205}
206
207pub struct MessageBuilder {
211 headers: HashMap<String, HeaderValue>,
212 body: Vec<u8>,
213}
214
215impl MessageBuilder {
216 pub fn new(body: Vec<u8>) -> Self {
218 Self {
219 headers: HashMap::new(),
220 body,
221 }
222 }
223
224 pub fn header(mut self, key: impl Into<String>, value: impl Into<HeaderValue>) -> Self {
226 self.headers.insert(key.into(), value.into());
227 self
228 }
229
230 pub fn headers(mut self, headers: HashMap<String, HeaderValue>) -> Self {
232 self.headers.extend(headers);
233 self
234 }
235
236 pub fn build(mut self) -> Message {
238 if !self.headers.contains_key(header::ID) {
239 self.headers.insert(
240 header::ID.to_string(),
241 HeaderValue::String(Uuid::new_v4().to_string()),
242 );
243 }
244
245 if !self.headers.contains_key(header::TIMESTAMP) {
246 let now = SystemTime::now()
247 .duration_since(UNIX_EPOCH)
248 .unwrap()
249 .as_millis() as i64;
250 self.headers
251 .insert(header::TIMESTAMP.to_string(), HeaderValue::Integer(now));
252 }
253
254 Message {
255 headers: MessageHeaders { map: self.headers },
256 body: self.body,
257 }
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn builder_generates_id_and_timestamp() {
267 let msg = MessageBuilder::new(b"hello".to_vec()).build();
268
269 assert!(!msg.headers().id().is_empty());
270 assert!(msg.headers().timestamp() > 0);
271 assert_eq!(msg.body(), b"hello");
272 }
273
274 #[test]
275 fn builder_preserves_user_provided_id() {
276 let msg = MessageBuilder::new(b"hello".to_vec())
277 .header(header::ID, "custom-id-1")
278 .build();
279
280 assert_eq!(msg.headers().id(), "custom-id-1");
281 }
282
283 #[test]
284 fn builder_with_well_known_headers() {
285 let msg = MessageBuilder::new(b"{}".to_vec())
286 .header(header::CONTENT_TYPE, "application/json")
287 .header(header::CORRELATION_ID, "corr-1")
288 .header(header::TTL, 5000_i64)
289 .header(header::REPLY_TO, "reply-chan")
290 .build();
291
292 assert_eq!(msg.headers().content_type(), Some("application/json"));
293 assert_eq!(msg.headers().correlation_id(), Some("corr-1"));
294 assert_eq!(msg.headers().ttl(), Some(5000));
295 assert_eq!(msg.headers().reply_to(), Some("reply-chan"));
296 }
297
298 #[test]
299 fn builder_with_extension_headers() {
300 let msg = MessageBuilder::new(b"{}".to_vec())
301 .header("traceparent", "00-abc-def-01")
302 .header("x-flag", true)
303 .header("x-count", 42_i64)
304 .build();
305
306 let tp: Option<&str> = msg.headers().get("traceparent");
307 assert_eq!(tp, Some("00-abc-def-01"));
308
309 let flag: Option<bool> = msg.headers().get("x-flag");
310 assert_eq!(flag, Some(true));
311
312 let count: Option<i64> = msg.headers().get("x-count");
313 assert_eq!(count, Some(42));
314 }
315
316 #[test]
317 fn get_returns_none_for_missing_key() {
318 let msg = MessageBuilder::new(b"".to_vec()).build();
319
320 let missing: Option<&str> = msg.headers().get("nonexistent");
321 assert_eq!(missing, None);
322 }
323
324 #[test]
325 fn get_returns_none_for_type_mismatch() {
326 let msg = MessageBuilder::new(b"".to_vec())
327 .header("x-count", 42_i64)
328 .build();
329
330 let wrong_type: Option<&str> = msg.headers().get("x-count");
332 assert_eq!(wrong_type, None);
333 }
334
335 #[test]
336 fn get_raw_header_value() {
337 let msg = MessageBuilder::new(b"".to_vec())
338 .header("x-count", 42_i64)
339 .build();
340
341 let raw: Option<&HeaderValue> = msg.headers().get("x-count");
342 assert_eq!(raw, Some(&HeaderValue::Integer(42)));
343 }
344
345 #[test]
346 fn builder_with_hashmap_headers() {
347 let mut user_headers = HashMap::new();
348 user_headers.insert(
349 header::CONTENT_TYPE.to_string(),
350 HeaderValue::from("text/plain"),
351 );
352 user_headers.insert("x-custom".to_string(), HeaderValue::from("value"));
353
354 let msg = MessageBuilder::new(b"hello".to_vec())
355 .headers(user_headers)
356 .build();
357
358 assert_eq!(msg.headers().content_type(), Some("text/plain"));
359 let custom: Option<&str> = msg.headers().get("x-custom");
360 assert_eq!(custom, Some("value"));
361 }
362
363 #[test]
364 fn headers_iter() {
365 let msg = MessageBuilder::new(b"".to_vec())
366 .header(header::CONTENT_TYPE, "text/plain")
367 .build();
368
369 assert!(msg.headers().len() >= 3);
371
372 let keys: Vec<&str> = msg.headers().iter().map(|(k, _)| k).collect();
373 assert!(keys.contains(&header::ID));
374 assert!(keys.contains(&header::TIMESTAMP));
375 assert!(keys.contains(&header::CONTENT_TYPE));
376 }
377}