eventbus_core/contract/
message.rs1use crate::eventbus::Message;
2
3pub const HEADER_CONTENT_TYPE: &str = "content-type";
8pub const HEADER_EVENT_VERSION: &str = "event-version";
9pub const HEADER_TRACE_PARENT: &str = "traceparent";
10pub const HEADER_TRACE_STATE: &str = "tracestate";
11pub const HEADER_BAGGAGE: &str = "baggage";
12pub const HEADER_IDEMPOTENCY_KEY: &str = "idempotency-key";
13pub const HEADER_RETRY_ATTEMPT: &str = "retry-attempt";
14pub const HEADER_RETRY_REASON: &str = "retry-reason";
15pub const HEADER_DEAD_LETTER_REASON: &str = "dead-letter-reason";
16
17#[derive(Debug, Clone, Default)]
22pub struct SchemaDescriptor {
23 pub content_type: String,
24 pub event_version: String,
25}
26
27impl SchemaDescriptor {
28 pub fn validate(&self) -> Result<(), crate::error::EventBusError> {
29 if self.content_type.trim().is_empty() {
30 return Err(crate::error::EventBusError::Validation(
31 "content type is required".into(),
32 ));
33 }
34 if self.event_version.trim().is_empty() {
35 return Err(crate::error::EventBusError::Validation(
36 "event version is required".into(),
37 ));
38 }
39 Ok(())
40 }
41}
42
43#[derive(Debug, Clone, Default)]
48pub struct TraceContext {
49 pub trace_parent: Option<String>,
50 pub trace_state: Option<String>,
51 pub baggage: Option<String>,
52 pub trace_uid: Option<String>,
53 pub correlation_uid: Option<String>,
54}
55
56const MAX_TRACEPARENT_LEN: usize = 255;
70
71impl Message {
72 pub fn set_schema(&mut self, content_type: &str, event_version: &str) {
73 self.headers
74 .insert(HEADER_CONTENT_TYPE.into(), content_type.into());
75 self.headers
76 .insert(HEADER_EVENT_VERSION.into(), event_version.into());
77 self.content_type = Some(content_type.into());
78 self.event_version = Some(event_version.into());
79 }
80
81 pub fn schema(&self) -> SchemaDescriptor {
82 SchemaDescriptor {
83 content_type: self.content_type.clone().unwrap_or_default(),
84 event_version: self.event_version.clone().unwrap_or_default(),
85 }
86 }
87
88 pub fn set_trace_context(
89 &mut self,
90 ctx: &TraceContext,
91 ) -> Result<(), crate::error::EventBusError> {
92 if let Some(ref tp) = ctx.trace_parent {
93 if tp.len() > MAX_TRACEPARENT_LEN {
94 return Err(crate::error::EventBusError::Validation(format!(
95 "traceparent exceeds maximum length of {MAX_TRACEPARENT_LEN}"
96 )));
97 }
98 self.headers.insert(HEADER_TRACE_PARENT.into(), tp.clone());
99 }
100 if let Some(ref ts) = ctx.trace_state {
101 self.headers.insert(HEADER_TRACE_STATE.into(), ts.clone());
102 }
103 if let Some(ref b) = ctx.baggage {
104 self.headers.insert(HEADER_BAGGAGE.into(), b.clone());
105 }
106 self.trace_uid = ctx.trace_uid.clone();
107 self.correlation_uid = ctx.correlation_uid.clone();
108 Ok(())
109 }
110
111 pub fn trace_context(&self) -> TraceContext {
112 TraceContext {
113 trace_parent: self.headers.get(HEADER_TRACE_PARENT).cloned(),
114 trace_state: self.headers.get(HEADER_TRACE_STATE).cloned(),
115 baggage: self.headers.get(HEADER_BAGGAGE).cloned(),
116 trace_uid: self.trace_uid.clone(),
117 correlation_uid: self.correlation_uid.clone(),
118 }
119 }
120
121 pub fn set_idempotency_key(&mut self, key: &str) {
122 self.idempotency_key = Some(key.into());
123 self.headers
124 .insert(HEADER_IDEMPOTENCY_KEY.into(), key.into());
125 }
126
127 pub fn idempotency_key(&self) -> Option<&str> {
128 self.idempotency_key.as_deref()
129 }
130
131 pub fn normalize(&mut self) {
138 if self.content_type.is_none() {
139 if let Some(v) = self.headers.get(HEADER_CONTENT_TYPE) {
140 self.content_type = Some(v.clone());
141 }
142 }
143 if self.event_version.is_none() {
144 if let Some(v) = self.headers.get(HEADER_EVENT_VERSION) {
145 self.event_version = Some(v.clone());
146 }
147 }
148 if self.idempotency_key.is_none() {
149 if let Some(v) = self.headers.get(HEADER_IDEMPOTENCY_KEY) {
150 self.idempotency_key = Some(v.clone());
151 }
152 }
153 }
154}
155
156#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::eventbus::Headers;
164 use chrono::Utc;
165
166 fn test_message() -> Message {
167 Message {
168 uid: "test-uid".into(),
169 topic: crate::Topic::new("test.topic").expect("topic"),
170 key: "key".into(),
171 kind: "test.kind".into(),
172 source: "test".into(),
173 occurred_at: Utc::now(),
174 headers: Headers::new(),
175 payload: bytes::Bytes::new(),
176 content_type: None,
177 event_version: None,
178 idempotency_key: None,
179 expires_at: None,
180 trace_uid: None,
181 correlation_uid: None,
182 }
183 }
184
185 #[test]
186 fn schema_roundtrip() {
187 let mut msg = test_message();
188 msg.set_schema("application/json", "v1");
189
190 let schema = msg.schema();
191 assert_eq!(schema.content_type, "application/json");
192 assert_eq!(schema.event_version, "v1");
193 }
194
195 #[test]
196 fn trace_context_roundtrip() {
197 let mut msg = test_message();
198 let trace_uid = "trace-uid".to_string();
199 let correlation_uid = "corr-uid".to_string();
200
201 msg.set_trace_context(&TraceContext {
202 trace_parent: Some("00-abc-def-01".into()),
203 trace_state: Some("vendor=value".into()),
204 baggage: Some("key=value".into()),
205 trace_uid: Some(trace_uid.clone()),
206 correlation_uid: Some(correlation_uid.clone()),
207 })
208 .expect("set_trace_context");
209
210 let ctx = msg.trace_context();
211 assert_eq!(ctx.trace_parent.as_deref(), Some("00-abc-def-01"));
212 assert_eq!(ctx.trace_uid.as_deref(), Some("trace-uid"));
213 assert_eq!(ctx.correlation_uid.as_deref(), Some("corr-uid"));
214 }
215
216 #[test]
217 fn set_trace_context_rejects_oversized_traceparent() {
218 let mut msg = test_message();
219 let huge = "a".repeat(MAX_TRACEPARENT_LEN + 1);
220 let res = msg.set_trace_context(&TraceContext {
221 trace_parent: Some(huge),
222 ..Default::default()
223 });
224 assert!(res.is_err());
225 }
226
227 #[test]
228 fn idempotency_key_roundtrip() {
229 let mut msg = test_message();
230 msg.set_idempotency_key("idem-123");
231
232 assert_eq!(msg.idempotency_key(), Some("idem-123"));
233 assert_eq!(
234 msg.headers.get(HEADER_IDEMPOTENCY_KEY).map(|s| s.as_str()),
235 Some("idem-123"),
236 );
237 }
238
239 #[test]
240 fn idempotency_key_reads_only_typed_field() {
241 let mut msg = test_message();
244 msg.headers
245 .insert(HEADER_IDEMPOTENCY_KEY.into(), "from-header".into());
246
247 assert_eq!(msg.idempotency_key(), None);
248 }
249
250 #[test]
251 fn normalize_hoists_headers_into_typed_fields() {
252 let mut msg = test_message();
255 msg.headers
256 .insert(HEADER_IDEMPOTENCY_KEY.into(), "from-header".into());
257 msg.headers
258 .insert(HEADER_CONTENT_TYPE.into(), "application/json".into());
259 msg.headers.insert(HEADER_EVENT_VERSION.into(), "v2".into());
260
261 msg.normalize();
262
263 assert_eq!(msg.idempotency_key(), Some("from-header"));
264 assert_eq!(msg.content_type.as_deref(), Some("application/json"));
265 assert_eq!(msg.event_version.as_deref(), Some("v2"));
266 }
267
268 #[test]
269 fn normalize_does_not_overwrite_explicit_typed_fields() {
270 let mut msg = test_message();
271 msg.idempotency_key = Some("from-field".into());
272 msg.headers
273 .insert(HEADER_IDEMPOTENCY_KEY.into(), "from-header".into());
274
275 msg.normalize();
276
277 assert_eq!(msg.idempotency_key(), Some("from-field"));
278 }
279
280 #[test]
281 fn schema_descriptor_validate() {
282 let valid = SchemaDescriptor {
283 content_type: "application/json".into(),
284 event_version: "v1".into(),
285 };
286 assert!(valid.validate().is_ok());
287
288 let invalid = SchemaDescriptor {
289 content_type: "".into(),
290 event_version: "v1".into(),
291 };
292 assert!(invalid.validate().is_err());
293 }
294}