Skip to main content

composable_runtime/messaging/
message.rs

1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use uuid::Uuid;
5
6/// Well-known header keys.
7pub mod header {
8    pub const ID: &str = "id";
9    pub const TIMESTAMP: &str = "timestamp";
10    pub const TTL: &str = "ttl";
11    pub const CONTENT_TYPE: &str = "content-type";
12    pub const REPLY_TO: &str = "reply-to";
13    pub const CORRELATION_ID: &str = "correlation-id";
14}
15
16/// Header value types.
17#[derive(Debug, Clone, PartialEq)]
18pub enum HeaderValue {
19    String(String),
20    Integer(i64),
21    Bool(bool),
22}
23
24impl std::fmt::Display for HeaderValue {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            HeaderValue::String(s) => write!(f, "{s}"),
28            HeaderValue::Integer(n) => write!(f, "{n}"),
29            HeaderValue::Bool(b) => write!(f, "{b}"),
30        }
31    }
32}
33
34impl From<String> for HeaderValue {
35    fn from(s: String) -> Self {
36        HeaderValue::String(s)
37    }
38}
39
40impl From<&str> for HeaderValue {
41    fn from(s: &str) -> Self {
42        HeaderValue::String(s.to_string())
43    }
44}
45
46impl From<i64> for HeaderValue {
47    fn from(n: i64) -> Self {
48        HeaderValue::Integer(n)
49    }
50}
51
52impl From<bool> for HeaderValue {
53    fn from(b: bool) -> Self {
54        HeaderValue::Bool(b)
55    }
56}
57
58/// Trait for extracting typed values from a [`HeaderValue`].
59///
60/// Enables `message.headers().get::<&str>("key")` with type inference.
61pub trait FromHeaderValue<'a>: Sized {
62    fn from_header_value(value: &'a HeaderValue) -> Option<Self>;
63}
64
65impl<'a> FromHeaderValue<'a> for &'a str {
66    fn from_header_value(value: &'a HeaderValue) -> Option<Self> {
67        match value {
68            HeaderValue::String(s) => Some(s.as_str()),
69            _ => None,
70        }
71    }
72}
73
74impl<'a> FromHeaderValue<'a> for &'a HeaderValue {
75    fn from_header_value(value: &'a HeaderValue) -> Option<Self> {
76        Some(value)
77    }
78}
79
80impl FromHeaderValue<'_> for i64 {
81    fn from_header_value(value: &HeaderValue) -> Option<Self> {
82        match value {
83            HeaderValue::Integer(n) => Some(*n),
84            _ => None,
85        }
86    }
87}
88
89impl FromHeaderValue<'_> for u64 {
90    fn from_header_value(value: &HeaderValue) -> Option<Self> {
91        match value {
92            HeaderValue::Integer(n) => (*n).try_into().ok(),
93            _ => None,
94        }
95    }
96}
97
98impl FromHeaderValue<'_> for bool {
99    fn from_header_value(value: &HeaderValue) -> Option<Self> {
100        match value {
101            HeaderValue::Bool(b) => Some(*b),
102            _ => None,
103        }
104    }
105}
106
107/// Message headers backed by a uniform key-value map.
108///
109/// Well-known headers have typed accessor methods. All headers (well-known and
110/// custom) are stored in the same map and accessible via the generic
111/// [`get`](MessageHeaders::get) method.
112#[derive(Debug, Clone, PartialEq)]
113pub struct MessageHeaders {
114    map: HashMap<String, HeaderValue>,
115}
116
117impl MessageHeaders {
118    /// Unique message identifier. Always present on a constructed [`Message`].
119    pub fn id(&self) -> &str {
120        match self.map.get(header::ID) {
121            Some(HeaderValue::String(s)) => s.as_str(),
122            _ => unreachable!("MessageBuilder guarantees id is present"),
123        }
124    }
125
126    /// Message creation time (Unix epoch milliseconds). Always present on a
127    /// constructed [`Message`].
128    pub fn timestamp(&self) -> u64 {
129        match self.map.get(header::TIMESTAMP) {
130            Some(HeaderValue::Integer(n)) => *n as u64,
131            _ => unreachable!("MessageBuilder guarantees timestamp is present"),
132        }
133    }
134
135    /// Time-to-live in milliseconds.
136    pub fn ttl(&self) -> Option<u64> {
137        self.get(header::TTL)
138    }
139
140    /// MIME type of the body.
141    pub fn content_type(&self) -> Option<&str> {
142        self.get(header::CONTENT_TYPE)
143    }
144
145    /// Return address for replies.
146    pub fn reply_to(&self) -> Option<&str> {
147        self.get(header::REPLY_TO)
148    }
149
150    /// Links related messages across a conversation or task.
151    pub fn correlation_id(&self) -> Option<&str> {
152        self.get(header::CORRELATION_ID)
153    }
154
155    /// Get a header value by key.
156    ///
157    /// Returns `None` if the key is absent or the value is a different type.
158    ///
159    /// ```ignore
160    /// let ct: Option<&str> = headers.get("content-type");
161    /// let ttl: Option<u64> = headers.get("ttl");
162    /// let custom: Option<bool> = headers.get("x-flag");
163    /// let raw: Option<&HeaderValue> = headers.get("x-anything");
164    /// ```
165    pub fn get<'a, T: FromHeaderValue<'a>>(&'a self, key: &str) -> Option<T> {
166        self.map.get(key).and_then(T::from_header_value)
167    }
168
169    /// Iterate over all headers as key-value pairs.
170    pub fn iter(&self) -> impl Iterator<Item = (&str, &HeaderValue)> {
171        self.map.iter().map(|(k, v)| (k.as_str(), v))
172    }
173
174    /// Number of headers.
175    pub fn len(&self) -> usize {
176        self.map.len()
177    }
178
179    /// Whether the headers map is empty.
180    pub fn is_empty(&self) -> bool {
181        self.map.is_empty()
182    }
183}
184
185/// The internal message type.
186///
187/// Body is opaque bytes with content-type.
188/// Only constructable via [`MessageBuilder`].
189#[derive(Debug, Clone, PartialEq)]
190pub struct Message {
191    headers: MessageHeaders,
192    body: Vec<u8>,
193}
194
195impl Message {
196    /// Message headers.
197    pub fn headers(&self) -> &MessageHeaders {
198        &self.headers
199    }
200
201    /// Message body as raw bytes.
202    pub fn body(&self) -> &[u8] {
203        &self.body
204    }
205}
206
207/// Builder for constructing a [`Message`].
208///
209/// Generates `id` and `timestamp` if not already provided in the headers.
210pub struct MessageBuilder {
211    headers: HashMap<String, HeaderValue>,
212    body: Vec<u8>,
213}
214
215impl MessageBuilder {
216    /// Create a builder with the given body.
217    pub fn new(body: Vec<u8>) -> Self {
218        Self {
219            headers: HashMap::new(),
220            body,
221        }
222    }
223
224    /// Set a header.
225    pub fn header(mut self, key: impl Into<String>, value: impl Into<HeaderValue>) -> Self {
226        self.headers.insert(key.into(), value.into());
227        self
228    }
229
230    /// Merge user-provided headers into the builder.
231    pub fn headers(mut self, headers: HashMap<String, HeaderValue>) -> Self {
232        self.headers.extend(headers);
233        self
234    }
235
236    /// Build the [`Message`].
237    pub fn build(mut self) -> Message {
238        if !self.headers.contains_key(header::ID) {
239            self.headers.insert(
240                header::ID.to_string(),
241                HeaderValue::String(Uuid::new_v4().to_string()),
242            );
243        }
244
245        if !self.headers.contains_key(header::TIMESTAMP) {
246            let now = SystemTime::now()
247                .duration_since(UNIX_EPOCH)
248                .unwrap()
249                .as_millis() as i64;
250            self.headers
251                .insert(header::TIMESTAMP.to_string(), HeaderValue::Integer(now));
252        }
253
254        Message {
255            headers: MessageHeaders { map: self.headers },
256            body: self.body,
257        }
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn builder_generates_id_and_timestamp() {
267        let msg = MessageBuilder::new(b"hello".to_vec()).build();
268
269        assert!(!msg.headers().id().is_empty());
270        assert!(msg.headers().timestamp() > 0);
271        assert_eq!(msg.body(), b"hello");
272    }
273
274    #[test]
275    fn builder_preserves_user_provided_id() {
276        let msg = MessageBuilder::new(b"hello".to_vec())
277            .header(header::ID, "custom-id-1")
278            .build();
279
280        assert_eq!(msg.headers().id(), "custom-id-1");
281    }
282
283    #[test]
284    fn builder_with_well_known_headers() {
285        let msg = MessageBuilder::new(b"{}".to_vec())
286            .header(header::CONTENT_TYPE, "application/json")
287            .header(header::CORRELATION_ID, "corr-1")
288            .header(header::TTL, 5000_i64)
289            .header(header::REPLY_TO, "reply-chan")
290            .build();
291
292        assert_eq!(msg.headers().content_type(), Some("application/json"));
293        assert_eq!(msg.headers().correlation_id(), Some("corr-1"));
294        assert_eq!(msg.headers().ttl(), Some(5000));
295        assert_eq!(msg.headers().reply_to(), Some("reply-chan"));
296    }
297
298    #[test]
299    fn builder_with_extension_headers() {
300        let msg = MessageBuilder::new(b"{}".to_vec())
301            .header("traceparent", "00-abc-def-01")
302            .header("x-flag", true)
303            .header("x-count", 42_i64)
304            .build();
305
306        let tp: Option<&str> = msg.headers().get("traceparent");
307        assert_eq!(tp, Some("00-abc-def-01"));
308
309        let flag: Option<bool> = msg.headers().get("x-flag");
310        assert_eq!(flag, Some(true));
311
312        let count: Option<i64> = msg.headers().get("x-count");
313        assert_eq!(count, Some(42));
314    }
315
316    #[test]
317    fn get_returns_none_for_missing_key() {
318        let msg = MessageBuilder::new(b"".to_vec()).build();
319
320        let missing: Option<&str> = msg.headers().get("nonexistent");
321        assert_eq!(missing, None);
322    }
323
324    #[test]
325    fn get_returns_none_for_type_mismatch() {
326        let msg = MessageBuilder::new(b"".to_vec())
327            .header("x-count", 42_i64)
328            .build();
329
330        // x-count is an Integer, requesting &str returns None
331        let wrong_type: Option<&str> = msg.headers().get("x-count");
332        assert_eq!(wrong_type, None);
333    }
334
335    #[test]
336    fn get_raw_header_value() {
337        let msg = MessageBuilder::new(b"".to_vec())
338            .header("x-count", 42_i64)
339            .build();
340
341        let raw: Option<&HeaderValue> = msg.headers().get("x-count");
342        assert_eq!(raw, Some(&HeaderValue::Integer(42)));
343    }
344
345    #[test]
346    fn builder_with_hashmap_headers() {
347        let mut user_headers = HashMap::new();
348        user_headers.insert(
349            header::CONTENT_TYPE.to_string(),
350            HeaderValue::from("text/plain"),
351        );
352        user_headers.insert("x-custom".to_string(), HeaderValue::from("value"));
353
354        let msg = MessageBuilder::new(b"hello".to_vec())
355            .headers(user_headers)
356            .build();
357
358        assert_eq!(msg.headers().content_type(), Some("text/plain"));
359        let custom: Option<&str> = msg.headers().get("x-custom");
360        assert_eq!(custom, Some("value"));
361    }
362
363    #[test]
364    fn headers_iter() {
365        let msg = MessageBuilder::new(b"".to_vec())
366            .header(header::CONTENT_TYPE, "text/plain")
367            .build();
368
369        // At minimum: id, timestamp, content-type
370        assert!(msg.headers().len() >= 3);
371
372        let keys: Vec<&str> = msg.headers().iter().map(|(k, _)| k).collect();
373        assert!(keys.contains(&header::ID));
374        assert!(keys.contains(&header::TIMESTAMP));
375        assert!(keys.contains(&header::CONTENT_TYPE));
376    }
377}