Skip to main content

eventbus_core/contract/
message.rs

1use crate::eventbus::Message;
2
3// ---------------------------------------------------------------------------
4// Well-known header keys
5// ---------------------------------------------------------------------------
6
7pub const HEADER_CONTENT_TYPE: &str = "content-type";
8pub const HEADER_EVENT_VERSION: &str = "event-version";
9pub const HEADER_TRACE_PARENT: &str = "traceparent";
10pub const HEADER_TRACE_STATE: &str = "tracestate";
11pub const HEADER_BAGGAGE: &str = "baggage";
12pub const HEADER_IDEMPOTENCY_KEY: &str = "idempotency-key";
13pub const HEADER_RETRY_ATTEMPT: &str = "retry-attempt";
14pub const HEADER_RETRY_REASON: &str = "retry-reason";
15pub const HEADER_DEAD_LETTER_REASON: &str = "dead-letter-reason";
16
17// ---------------------------------------------------------------------------
18// Schema descriptor
19// ---------------------------------------------------------------------------
20
21#[derive(Debug, Clone, Default)]
22pub struct SchemaDescriptor {
23    pub content_type: String,
24    pub event_version: String,
25}
26
27impl SchemaDescriptor {
28    pub fn validate(&self) -> Result<(), crate::error::EventBusError> {
29        if self.content_type.trim().is_empty() {
30            return Err(crate::error::EventBusError::Validation(
31                "content type is required".into(),
32            ));
33        }
34        if self.event_version.trim().is_empty() {
35            return Err(crate::error::EventBusError::Validation(
36                "event version is required".into(),
37            ));
38        }
39        Ok(())
40    }
41}
42
43// ---------------------------------------------------------------------------
44// Trace context
45// ---------------------------------------------------------------------------
46
47#[derive(Debug, Clone, Default)]
48pub struct TraceContext {
49    pub trace_parent: Option<String>,
50    pub trace_state: Option<String>,
51    pub baggage: Option<String>,
52    pub trace_uid: Option<String>,
53    pub correlation_uid: Option<String>,
54}
55
56// ---------------------------------------------------------------------------
57// Message extensions
58// ---------------------------------------------------------------------------
59//
60// Typed fields (`content_type`, `event_version`, `idempotency_key`, `trace_uid`,
61// `correlation_uid`) are the single source of truth for in-process accessors.
62// Headers carry the same values for cross-language wire compatibility (the Go
63// `StreamBus` reads them as headers). On receive, [`Message::normalize`] hoists
64// header values into typed fields so consumers always read from the fields.
65//
66// Maximum length for a `traceparent` value (W3C spec allows up to 55 chars but
67// some vendors emit longer; we cap at 255 to bound memory and reject obvious
68// abuse).
69const MAX_TRACEPARENT_LEN: usize = 255;
70
71impl Message {
72    pub fn set_schema(&mut self, content_type: &str, event_version: &str) {
73        self.headers
74            .insert(HEADER_CONTENT_TYPE.into(), content_type.into());
75        self.headers
76            .insert(HEADER_EVENT_VERSION.into(), event_version.into());
77        self.content_type = Some(content_type.into());
78        self.event_version = Some(event_version.into());
79    }
80
81    pub fn schema(&self) -> SchemaDescriptor {
82        SchemaDescriptor {
83            content_type: self.content_type.clone().unwrap_or_default(),
84            event_version: self.event_version.clone().unwrap_or_default(),
85        }
86    }
87
88    pub fn set_trace_context(
89        &mut self,
90        ctx: &TraceContext,
91    ) -> Result<(), crate::error::EventBusError> {
92        if let Some(ref tp) = ctx.trace_parent {
93            if tp.len() > MAX_TRACEPARENT_LEN {
94                return Err(crate::error::EventBusError::Validation(format!(
95                    "traceparent exceeds maximum length of {MAX_TRACEPARENT_LEN}"
96                )));
97            }
98            self.headers.insert(HEADER_TRACE_PARENT.into(), tp.clone());
99        }
100        if let Some(ref ts) = ctx.trace_state {
101            self.headers.insert(HEADER_TRACE_STATE.into(), ts.clone());
102        }
103        if let Some(ref b) = ctx.baggage {
104            self.headers.insert(HEADER_BAGGAGE.into(), b.clone());
105        }
106        self.trace_uid = ctx.trace_uid.clone();
107        self.correlation_uid = ctx.correlation_uid.clone();
108        Ok(())
109    }
110
111    pub fn trace_context(&self) -> TraceContext {
112        TraceContext {
113            trace_parent: self.headers.get(HEADER_TRACE_PARENT).cloned(),
114            trace_state: self.headers.get(HEADER_TRACE_STATE).cloned(),
115            baggage: self.headers.get(HEADER_BAGGAGE).cloned(),
116            trace_uid: self.trace_uid.clone(),
117            correlation_uid: self.correlation_uid.clone(),
118        }
119    }
120
121    pub fn set_idempotency_key(&mut self, key: &str) {
122        self.idempotency_key = Some(key.into());
123        self.headers
124            .insert(HEADER_IDEMPOTENCY_KEY.into(), key.into());
125    }
126
127    pub fn idempotency_key(&self) -> Option<&str> {
128        self.idempotency_key.as_deref()
129    }
130
131    /// Hoist header values into typed fields when the typed field is unset.
132    ///
133    /// Backends call this after deserializing wire-format messages so that
134    /// downstream consumers can rely on the typed fields as the single source
135    /// of truth. This preserves wire compatibility with producers that only
136    /// set headers (e.g., the Go `StreamBus`).
137    pub fn normalize(&mut self) {
138        if self.content_type.is_none() {
139            if let Some(v) = self.headers.get(HEADER_CONTENT_TYPE) {
140                self.content_type = Some(v.clone());
141            }
142        }
143        if self.event_version.is_none() {
144            if let Some(v) = self.headers.get(HEADER_EVENT_VERSION) {
145                self.event_version = Some(v.clone());
146            }
147        }
148        if self.idempotency_key.is_none() {
149            if let Some(v) = self.headers.get(HEADER_IDEMPOTENCY_KEY) {
150                self.idempotency_key = Some(v.clone());
151            }
152        }
153    }
154}
155
156// ---------------------------------------------------------------------------
157// Tests
158// ---------------------------------------------------------------------------
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use crate::eventbus::Headers;
164    use chrono::Utc;
165
166    fn test_message() -> Message {
167        Message {
168            uid: "test-uid".into(),
169            topic: crate::Topic::new("test.topic").expect("topic"),
170            key: "key".into(),
171            kind: "test.kind".into(),
172            source: "test".into(),
173            occurred_at: Utc::now(),
174            headers: Headers::new(),
175            payload: bytes::Bytes::new(),
176            content_type: None,
177            event_version: None,
178            idempotency_key: None,
179            expires_at: None,
180            trace_uid: None,
181            correlation_uid: None,
182        }
183    }
184
185    #[test]
186    fn schema_roundtrip() {
187        let mut msg = test_message();
188        msg.set_schema("application/json", "v1");
189
190        let schema = msg.schema();
191        assert_eq!(schema.content_type, "application/json");
192        assert_eq!(schema.event_version, "v1");
193    }
194
195    #[test]
196    fn trace_context_roundtrip() {
197        let mut msg = test_message();
198        let trace_uid = "trace-uid".to_string();
199        let correlation_uid = "corr-uid".to_string();
200
201        msg.set_trace_context(&TraceContext {
202            trace_parent: Some("00-abc-def-01".into()),
203            trace_state: Some("vendor=value".into()),
204            baggage: Some("key=value".into()),
205            trace_uid: Some(trace_uid.clone()),
206            correlation_uid: Some(correlation_uid.clone()),
207        })
208        .expect("set_trace_context");
209
210        let ctx = msg.trace_context();
211        assert_eq!(ctx.trace_parent.as_deref(), Some("00-abc-def-01"));
212        assert_eq!(ctx.trace_uid.as_deref(), Some("trace-uid"));
213        assert_eq!(ctx.correlation_uid.as_deref(), Some("corr-uid"));
214    }
215
216    #[test]
217    fn set_trace_context_rejects_oversized_traceparent() {
218        let mut msg = test_message();
219        let huge = "a".repeat(MAX_TRACEPARENT_LEN + 1);
220        let res = msg.set_trace_context(&TraceContext {
221            trace_parent: Some(huge),
222            ..Default::default()
223        });
224        assert!(res.is_err());
225    }
226
227    #[test]
228    fn idempotency_key_roundtrip() {
229        let mut msg = test_message();
230        msg.set_idempotency_key("idem-123");
231
232        assert_eq!(msg.idempotency_key(), Some("idem-123"));
233        assert_eq!(
234            msg.headers.get(HEADER_IDEMPOTENCY_KEY).map(|s| s.as_str()),
235            Some("idem-123"),
236        );
237    }
238
239    #[test]
240    fn idempotency_key_reads_only_typed_field() {
241        // Without `normalize`, a header-only message must NOT leak through —
242        // typed field is the in-process source of truth.
243        let mut msg = test_message();
244        msg.headers
245            .insert(HEADER_IDEMPOTENCY_KEY.into(), "from-header".into());
246
247        assert_eq!(msg.idempotency_key(), None);
248    }
249
250    #[test]
251    fn normalize_hoists_headers_into_typed_fields() {
252        // Backends call `normalize()` at the wire boundary so consumers can
253        // rely on typed fields regardless of which side wrote the wire form.
254        let mut msg = test_message();
255        msg.headers
256            .insert(HEADER_IDEMPOTENCY_KEY.into(), "from-header".into());
257        msg.headers
258            .insert(HEADER_CONTENT_TYPE.into(), "application/json".into());
259        msg.headers.insert(HEADER_EVENT_VERSION.into(), "v2".into());
260
261        msg.normalize();
262
263        assert_eq!(msg.idempotency_key(), Some("from-header"));
264        assert_eq!(msg.content_type.as_deref(), Some("application/json"));
265        assert_eq!(msg.event_version.as_deref(), Some("v2"));
266    }
267
268    #[test]
269    fn normalize_does_not_overwrite_explicit_typed_fields() {
270        let mut msg = test_message();
271        msg.idempotency_key = Some("from-field".into());
272        msg.headers
273            .insert(HEADER_IDEMPOTENCY_KEY.into(), "from-header".into());
274
275        msg.normalize();
276
277        assert_eq!(msg.idempotency_key(), Some("from-field"));
278    }
279
280    #[test]
281    fn schema_descriptor_validate() {
282        let valid = SchemaDescriptor {
283            content_type: "application/json".into(),
284            event_version: "v1".into(),
285        };
286        assert!(valid.validate().is_ok());
287
288        let invalid = SchemaDescriptor {
289            content_type: "".into(),
290            event_version: "v1".into(),
291        };
292        assert!(invalid.validate().is_err());
293    }
294}